From 2dcd240ec02b168218630b48c26471ef48adc78c Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Mon, 18 Mar 2024 15:20:26 +0530 Subject: [PATCH] fix: lock rows and skip them during concurrent poll --- ...fix_lock_and_skip_rows_in_mq_poll.down.sql | 62 ++++++++++++++++++ ...0_fix_lock_and_skip_rows_in_mq_poll.up.sql | 63 +++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.down.sql create mode 100644 migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.up.sql diff --git a/migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.down.sql b/migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.down.sql new file mode 100644 index 0000000..237a44c --- /dev/null +++ b/migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.down.sql @@ -0,0 +1,62 @@ + +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql;- Add down migration script here diff --git a/migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.up.sql b/migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.up.sql new file mode 100644 index 0000000..8bf251b --- /dev/null +++ b/migrations/20240318094900_fix_lock_and_skip_rows_in_mq_poll.up.sql @@ -0,0 +1,63 @@ + +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + FOR UPDATE SKIP LOCKED + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql;