-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Native STOMP #9141
base: main
Are you sure you want to change the base?
WIP: Native STOMP #9141
Conversation
7856889
to
41ce7a9
Compare
41ce7a9
to
cb0eda2
Compare
cb0eda2
to
64b59ef
Compare
Regarding Connection closure, UTF, queues utilization - the idea is to merge this first, then tweak standard support and resources. |
|
||
%% {ok, BasicMessage} = rabbit_basic:message(ExchangeName, RoutingKey, Content), | ||
|
||
%% Delivery = #delivery{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete any commented code.
%% when SendFun :: fun((atom(), binary()) -> term()), | ||
%% AdapterInfo :: #amqp_adapter_info{}, | ||
%% SSLLoginName :: atom() | binary(), | ||
%% PeerAddr :: inet:ip_address(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uncomment the type specs and make sure dialyzer passes.
Thank you @ikvmw. Overall this PR goes into the right direction. There is some more work to do to make it production ready in order to merge it into
creates a quorum queue on
creates a quorum queue on There is even a test file called Same applies to streams. |
deps/rabbitmq_stomp/BUILD.bazel
Outdated
@@ -4,6 +4,7 @@ load("@rules_erlang//:dialyze.bzl", "dialyze", "plt") | |||
load( | |||
"//:rabbitmq.bzl", | |||
"BROKER_VERSION_REQUIREMENTS_ANY", | |||
"ENABLE_FEATURE_MAYBE_EXPR", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will remove enabling the maybe feature at runtime for 3.13 as 3.13 requires OTP 26.
However, it's needed right now.
@@ -28,6 +28,7 @@ | |||
start(normal, []) -> | |||
Config = parse_configuration(), | |||
Listeners = parse_listener_configuration(), | |||
rabbit_global_counters:init([{protocol, stomp}]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd vote to differentiate by STOMP versions, i.e. 1.0, 1.1, 1.2 since that's what we currently do for MQTT (3.1, 3.1.1, 5.0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are the global counters per protocol and queue type initialised? Are they missing?
@@ -140,21 +164,32 @@ initial_state(Configuration, | |||
%% to override this value? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete the adapter info. It's not needed anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The management UI shows that there is 1 AMQP 0.9.1 channel for the STOMP connection, which is wrong.
SMsgSeqNos = lists:usort(MsgSeqNos), | ||
UnconfirmedCutoff = case rabbit_confirms:is_empty(UC) of | ||
true -> lists:last(SMsgSeqNos) + 1; | ||
false -> rabbit_confirms:smallest(UC) | ||
end, | ||
Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]), | ||
{Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos), | ||
State1 = case Ms of | ||
[] -> State; | ||
_ -> MkMsgFun(lists:last(Ms), true, State) | ||
end, | ||
lists:foldl(fun(SeqNo, StateN) -> | ||
MkMsgFun(SeqNo, false, StateN) | ||
end, State1, Ss). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this code tested? If it's not tested, please add tests.
{eol, Actions} -> | ||
State1 = handle_queue_actions(Actions, State), | ||
State2 = handle_consuming_queue_down_or_eol(QRef, State1), | ||
{ConfirmMXs, UC1} = | ||
rabbit_confirms:remove_queue(QRef, State1#proc_state.unconfirmed), | ||
%% Deleted queue is a special case. | ||
%% Do not nack the "rejected" messages. | ||
State3 = record_confirms(ConfirmMXs, | ||
State2#proc_state{unconfirmed = UC1}), | ||
{ok, State3#proc_state{queue_states = rabbit_queue_type:remove(QRef, QStates0)}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this code tested? If it's not tested, please add tests.
({rejected, _QRef, MsgSeqNos}, S0) -> | ||
{U, Rej} = | ||
lists:foldr( | ||
fun(SeqNo, {U1, Acc}) -> | ||
case rabbit_confirms:reject(SeqNo, U1) of | ||
{ok, MX, U2} -> | ||
{U2, [MX | Acc]}; | ||
{error, not_found} -> | ||
{U1, Acc} | ||
end | ||
end, {S0#proc_state.unconfirmed, []}, MsgSeqNos), | ||
S = S0#proc_state{unconfirmed = U}, | ||
%% Don't send anything, no nacks in STOMP | ||
record_rejects(Rej, S); | ||
({queue_down, QRef}, S0) -> | ||
handle_consuming_queue_down_or_eol(QRef, S0); | ||
%% TODO: I have no idea about the scope of credit_flow | ||
({block, QName}, S0) -> | ||
credit_flow:block(QName), | ||
S0; | ||
({unblock, QName}, S0) -> | ||
credit_flow:unblock(QName), | ||
S0; | ||
%% TODO: in rabbit_channel there code for handling | ||
%% send_drained and send_credit_reply | ||
%% I'm doing catch all here to not crash? | ||
(_, S0) -> | ||
S0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this code tested? If it's not tested, please add tests.
parse_endpoint(undefined) -> | ||
parse_endpoint("/queue"); | ||
parse_endpoint(Destination) when is_binary(Destination) -> | ||
parse_endpoint(unicode:characters_to_list(Destination)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this code tested? If it's not tested, please add tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parse_endpoint called on each SEND and SUBSCRIBE. Should be many dozens hits across our test suites
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rabbit_core_metrics:channel_queue_down({self(), QName}), | ||
erase({queue_stats, QName}), | ||
[begin | ||
rabbit_core_metrics:channel_queue_exchange_down({self(), QX}), | ||
erase({queue_exchange_stats, QX}) | ||
end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), | ||
QName0 =:= QName]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these stats erased? It looks like there aren't written in the first place?
|
||
maybe | ||
{ok, User} ?= rabbit_access_control:check_user_login(Username, AuthProps), | ||
{ok, AuthzCtx} ?= check_vhost_access(VHost, User, Addr), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we additionally check for whether the vhost exists and whether the vhost is alive? (see Native MQTT).
end, | ||
|
||
maybe | ||
{ok, User} ?= rabbit_access_control:check_user_login(Username, AuthProps), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rabbit_core_metrics:auth_attempt_failed()
is missing if user login is refused.
-record(subscription, {dest_hdr, ack_mode, multi_ack, description, queue_name}). | ||
|
||
-record(pending_ack, { | ||
%% delivery identifier used by clients | ||
%% to acknowledge and reject deliveries | ||
delivery_tag, | ||
%% consumer tag | ||
tag, | ||
delivered_at, | ||
%% queue name | ||
queue, | ||
%% message ID used by queue and message store implementations | ||
msg_id | ||
}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add type specs
Delivery = #'basic.deliver'{consumer_tag = ConsumerTag, | ||
delivery_tag = DeliveryTag, | ||
redelivered = Redelivered, | ||
exchange = ExchangeNameBin, | ||
routing_key = RoutingKey}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating an AMQP 0.9.1 #basic.deliver{}
is unnecessary. The STOMP protocol is used to send the message to the client, not the AMQP 0.9.1 protocol. I think you should restructure the code, specifically rabbit_stomp_util:headers_extra/4
should not accept a #basic.deliver{}
anymore.
end; | ||
{ExchangeNameList, RoutingKeyList} = parse_routing(Destination, DfltTopicEx), | ||
%% io:format("Parse_routing: ~p~n", [{ExchangeNameList, RoutingKeyList}]), | ||
RoutingKey = list_to_binary(RoutingKeyList), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The STOMP plugin operates too much on lists instead of binaries which is inefficient. At some point we should improve this. However, this should be a follow up PR.
|
||
-record(subscription, {dest_hdr, ack_mode, multi_ack, description}). | ||
-record(proc_state, {session_id, subscriptions, | ||
version, start_heartbeat_fun, pending_receipts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My editor shows that start_heartbeat_fun
is an unused field. Where is this field used? Remove any unused fields.
queue, | ||
%% message ID used by queue and message store implementations | ||
msg_id | ||
}). | ||
|
||
-define(FLUSH_TIMEOUT, 60000). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused macro
@@ -20,10 +20,6 @@ | |||
-include("rabbit_stomp_frame.hrl"). | |||
-include_lib("amqp_client/include/amqp_client.hrl"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
{error, queue_limit_exceeded} | ||
end. | ||
|
||
routing_init_state() -> sets:new(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.erlang.org/doc/man/sets.html#description
Erlang/OTP 24.0 introduced a new internal representation for sets which is more performant. Developers can use this new representation by passing the {version, 2} flag to new/1 and from_list/2, such as sets:new([{version, 2}]). This new representation will become the default in future Erlang/OTP versions. Functions that work on two sets, such as union/2 and similar, will work with sets of different versions. In such cases, there is no guarantee about the version of the returned set. Explicit conversion from the old version to the new one can be done with sets:from_list(sets:to_list(Old), [{version,2}]).
#'queue.delete'{queue = list_to_binary(QName), | ||
nowait = false}), | ||
QRes = rabbit_misc:r(VHost, queue, list_to_binary(QName)), | ||
io:format("Durable QRes: ~p~n", [QRes]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete
case rabbit_amqqueue:with( | ||
QRes, | ||
fun (Q) -> | ||
io:format("Delete queue ~p~n", [rabbit_queue_type:delete(Q, false, false, Username)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the logger if you want to log something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing access control check for queue deletion
Binding = #binding{source = rabbit_misc:r(VHost, exchange, list_to_binary(Exchange)), | ||
destination = QName, | ||
key = list_to_binary(RoutingKey)}, | ||
case rabbit_binding:add(Binding, Username) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing access control checks
credit_flow:block(QName), | ||
S0; | ||
({unblock, QName}, S0) -> | ||
credit_flow:unblock(QName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes probably more sense to do what has been done for Native MQTT, i.e .not using credit_flow
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, not using credit flow has worked well so far for MQTT, so let's just drop it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's great! Flow was on my "mysteries" list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm mqtt_processor still uses credit_flow
- https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl#L1914.
|
||
Destination = binary_to_list(Queue), | ||
Spec = #{no_ack => true, | ||
prefetch_count => application:get_env(rabbit, default_consumer_prefetch), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong prefetch count.
It leads to crashes later on.
e.g.
SEND
destination:/queue/reply-test
reply-to:/temp-queue/foo
Hello World!^@
then click on the reply-to-queue in the Management UI. You get a 500 error and RabbitMQ logs:
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> crasher:
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> initial call: cowboy_stream_h:request_process/3
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> pid: <0.1278.0>
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> registered_name: []
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> exception error: no function clause matching
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> thoas_encode:value({ok,{false,0}},
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> #Fun<thoas_encode.0.30747453>) (src/thoas_encode.erl, line 1720)
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> in function thoas_encode:map_naive_loop/2 (src/thoas_encode.erl, line 1712)
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> in call from thoas_encode:map_naive_loop/2 (src/thoas_encode.erl, line 1713)
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> in call from thoas_encode:map_naive/2 (src/thoas_encode.erl, line 1704)
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> in call from thoas_encode:list/2 (src/thoas_encode.erl, line 1692)
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> in call from thoas_encode:map_naive/2 (src/thoas_encode.erl, line 1703)
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> in call from thoas:encode/2 (src/thoas.erl, line 92)
2023-09-06 14:10:00.942131+00:00 [error] <0.1278.0> in call from rabbit_mgmt_util:reply0/3 (rabbit_mgmt_util.erl, line 252)
websocket_info(connection_created, State) -> | ||
Infos = infos(?INFO_ITEMS ++ ?OTHER_METRICS, State), | ||
|
||
?LOG_INFO("Connection created infos ~p", [Infos]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging the connection infos at info
level of every Web STOMP connection pollutes the logs too much.
@@ -233,21 +204,37 @@ websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}}, | |||
ReceiveFun = fun() -> Self ! client_timeout end, | |||
Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout, | |||
SendFun, ReceiveTimeout, ReceiveFun), | |||
{ok, State#state{heartbeat = Heartbeat}}; | |||
{ok, State#state{heartbeat = Heartbeat, | |||
timeout_sec = {0, 0}}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{0, 0}
is wrong?
%% a map of queue names to consumer tag lists | ||
queue_consumers, unacked_message_q, vhost, | ||
user, queue_states, delivery_tag = 0, msg_seq_no, delivery_flow, | ||
default_topic_exchange, default_nack_requeue}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put rarely changing fields into a their own sub record (e.g. as done for thecfg
field in
-record(ch, {cfg :: #conf{}, |
|
||
Message0 = mc_amqpl:message(ExchangeName, RoutingKey, Content0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be great to directly store STOMP messages as part of this PR and to implement a mc_stomp
module that does the translations. It's unnecessary to convert from / to AMQP 0.9.1 message format. But it can also be done as a separate PR.
Using a simple test app, it seems like publishing is significantly slower than on
This branch:
(note there's a 1 second sleep in the app, to avoid premature app termination, before all messages were sent) |
Messages published over STOMP, can't be consumed over AMQP 0.9.1.
logs:
|
There's a heartbeat related crash when publishing more than a couple messages.
leads to
If I change 10000 to 10, there's no stacktrace. NOTE: the same happens on |
Some performance observations at this point. 100 publishers -> 100 queues -> 100 consumers (note: there's no flow control here): Memory usage with 1000 consumers connected ( However, single connection/queue consumption performance seems degraded. |
NOTE: Bazel tests pass locally, I don't like the code yet though.
TODO: Double check Management UI correctly shows Connection information.
Proposed Changes
Don't use AMQP 0-9-1 as backend for STOMP.
Types of Changes
What types of changes does your code introduce to this project?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply.You can also fill these out after creating the PR.
If you're unsure about any of them, don't hesitate to ask on the mailing list.
We're here to help!
This is simply a reminder of what we are going to look for before merging your code.
CONTRIBUTING.md
documentFurther Comments
Documentation update: rabbitmq/rabbitmq-website#1713