Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle database timeouts in MQTT queue deletion (backport #12317) #12320

Merged
merged 3 commits into from
Sep 17, 2024

Conversation

mergify[bot]
Copy link

@mergify mergify bot commented Sep 16, 2024

This fixes some crash reports when using MQTT with Khepri, spotted by @mkuratczyk. With an OMQ stresstest:

omq mqtt --uri mqtt://localhost:1883 --uri mqtt://localhost:1884 --uri mqtt://localhost:1885 -x 10000 -y 10000 -r 1 --publish-to 'sensor/%d' --consume-from '/topic/sensor/%d' --mqtt-consumer-qos 1 --mqtt-publisher-qos 1

while a cluster restarts (make restart-cluster), we would see badmatch errors from matching on {ok, _} for rabbit_queue_type:delete/4 and exits for {normal, {gen_server2, call, [Pid, consumers, infinity]}}. That stress test causes queue churn since QoS1 MQTT creates transient exclusive classic queues. Restarting a node leads to very many queues being deleted which can overload Khepri and lead to timeouts.

The first commit makes a refactor to have rabbit_queue_type:delete/4 return {error, timeout} for timeout errors. {error, timeout} could already be returned and is handled in rabbit_amqqueue:delete_with/4. This change is just for consistency: in some places we returned a protocol_error record instead. The second commit handles the {error, timeout} result in rabbit_mqtt_processor.

Also included is a fix for rabbit_amqqueue:consumers/1 to catch exits: an exit can happen if another process asks for a classic queue's consumers while it is terminating. (With Khepri the terminate callback can take some time as it calls rabbit_amqqueue:internal_delete/2.)


This is an automatic backport of pull request #12317 done by Mergify.

This return value was already possible since a classic queue will return
it during termination if `rabbit_amqqueue:internal_delete/2` fails with
that value.

`rabbit_amqqueue:delete/4` already handles this value and converts it
into a protocol error and channel exit. The other caller (MQTT
processor) will be updated in a child commit.

This commit also replaces eager conversions to protocol errors in
rabbit_classic_queue, rabbit_quorum_queue and rabbit_stream_coordinator:
we should return `{error, timeout}` consistently and not hide it in
protocol errors.

(cherry picked from commit 9627903)
`delegate:invoke/2` catches errors but not exits of the delegate
process. Another process might query for a classic queue's consumers
while the classic queue is being deleted or otherwise terminating and
that would result in an exit of the calling process previously.

(cherry picked from commit a65ceb6)
@michaelklishin michaelklishin merged commit 9cbda0e into v4.0.x Sep 17, 2024
199 checks passed
@michaelklishin michaelklishin deleted the mergify/bp/v4.0.x/pr-12317 branch September 17, 2024 02:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants