Skip to content

Commit

Permalink
bullet-proof migration against deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Jan 10, 2025
1 parent 1cce0b3 commit 8b1c81c
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 191 deletions.
271 changes: 126 additions & 145 deletions backend/migrations/20250107121225_queue_v2.down.sql
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
-- Add down migration script here
-- no-transaction

-- 1. Drop constraints
START TRANSACTION;
DO $$ BEGIN
LOOP
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_queue') THEN
ALTER TABLE v2_job_queue DROP CONSTRAINT IF EXISTS v2_job_queue_id_fkey;
LOCK TABLE v2_job_queue, v2_job_runtime IN ACCESS EXCLUSIVE MODE;
ALTER TABLE IF EXISTS v2_job_runtime DROP CONSTRAINT IF EXISTS v2_job_runtime_id_fkey;
END IF;
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_completed') THEN
ALTER TABLE v2_job_completed DROP CONSTRAINT IF EXISTS v2_job_completed_id_fkey;
ALTER TABLE v2_job_completed ALTER COLUMN status DROP NOT NULL;
ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP NOT NULL;
END $$;
COMMIT;

START TRANSACTION;
DO $$ BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_queue') THEN
LOCK TABLE v2_job_queue, v2_job_flow_runtime IN ACCESS EXCLUSIVE MODE;
ALTER TABLE IF EXISTS v2_job_flow_runtime DROP CONSTRAINT IF EXISTS v2_job_flow_runtime_id_fkey;
END IF;
EXIT; -- Success, exit the loop
EXCEPTION
WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry';
WHEN others THEN RAISE; -- Re-raise unexpected exceptions
PERFORM pg_sleep(0.5);
END;
END LOOP;
END $$;
COMMIT;

-- 2. Drop all triggers
START TRANSACTION;
ALTER TABLE IF EXISTS v2_job_completed ALTER COLUMN status DROP NOT NULL;
ALTER TABLE IF EXISTS v2_job_completed ALTER COLUMN completed_at DROP NOT NULL;
COMMIT;

-- 2. Drop triggers
DROP FUNCTION IF EXISTS v2_job_queue_after_insert() CASCADE;
DROP FUNCTION IF EXISTS v2_job_queue_before_update() CASCADE;
DROP FUNCTION IF EXISTS v2_job_completed_before_insert() CASCADE;
Expand All @@ -32,153 +33,133 @@ DROP FUNCTION IF EXISTS v2_job_runtime_before_update() CASCADE;
DROP FUNCTION IF EXISTS v2_job_flow_runtime_before_insert() CASCADE;
DROP FUNCTION IF EXISTS v2_job_flow_runtime_before_update() CASCADE;

-- 3. `queue`
-- 3. `v2_job_queue` -> `queue`
START TRANSACTION;
ALTER TABLE IF EXISTS v2_job_queue DROP COLUMN IF EXISTS worker;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __parent_job TO parent_job;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __created_by TO created_by;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_hash TO script_hash;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_path TO script_path;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __args TO args;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __logs TO logs;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_code TO raw_code;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __canceled TO canceled;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __last_ping TO last_ping;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __job_kind TO job_kind;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __env_id TO env_id;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __schedule_path TO schedule_path;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __permissioned_as TO permissioned_as;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_status TO flow_status;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_flow TO raw_flow;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __is_flow_step TO is_flow_step;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __language TO language;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __same_worker TO same_worker;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_lock TO raw_lock;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __pre_run_error TO pre_run_error;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __email TO email;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __visible_to_owner TO visible_to_owner;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __mem_peak TO mem_peak;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __root_job TO root_job;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __leaf_jobs TO leaf_jobs;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrent_limit TO concurrent_limit;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrency_time_window_s TO concurrency_time_window_s;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __timeout TO timeout;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_step_id TO flow_step_id;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __cache_ttl TO cache_ttl;
COMMIT;

START TRANSACTION;
DO $$ BEGIN
LOOP
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_queue') THEN
EXIT;
IF EXISTS (SELECT 1 FROM pg_views WHERE viewname = 'queue') THEN
DROP VIEW IF EXISTS queue;
ALTER TABLE IF EXISTS v2_job_queue RENAME TO queue;
END IF;
ALTER TABLE IF EXISTS v2_job_queue DROP COLUMN IF EXISTS worker;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __parent_job TO parent_job;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __created_by TO created_by;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_hash TO script_hash;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_path TO script_path;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __args TO args;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __logs TO logs;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_code TO raw_code;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __canceled TO canceled;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __last_ping TO last_ping;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __job_kind TO job_kind;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __env_id TO env_id;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __schedule_path TO schedule_path;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __permissioned_as TO permissioned_as;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_status TO flow_status;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_flow TO raw_flow;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __is_flow_step TO is_flow_step;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __language TO language;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __same_worker TO same_worker;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_lock TO raw_lock;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __pre_run_error TO pre_run_error;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __email TO email;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __visible_to_owner TO visible_to_owner;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __mem_peak TO mem_peak;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __root_job TO root_job;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __leaf_jobs TO leaf_jobs;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrent_limit TO concurrent_limit;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrency_time_window_s TO concurrency_time_window_s;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __timeout TO timeout;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_step_id TO flow_step_id;
ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __cache_ttl TO cache_ttl;
DROP VIEW IF EXISTS queue;
ALTER TABLE IF EXISTS v2_job_queue RENAME TO queue;
EXIT; -- Success, exit the loop
EXCEPTION
WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry';
WHEN others THEN RAISE; -- Re-raise unexpected exceptions
PERFORM pg_sleep(0.5);
END;
END LOOP;
END $$;
COMMIT;

-- 4. `completed_job`
-- 4. `v2_job_completed` -> `completed_job`
START TRANSACTION;
ALTER TABLE IF EXISTS v2_job_completed
DROP COLUMN IF EXISTS status CASCADE,
DROP COLUMN IF EXISTS completed_at CASCADE,
DROP COLUMN IF EXISTS worker CASCADE;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN memory_peak TO mem_peak;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __parent_job TO parent_job;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_by TO created_by;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_at TO created_at;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __success TO success;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_hash TO script_hash;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_path TO script_path;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __args TO args;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __deleted TO deleted;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_code TO raw_code;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __canceled TO canceled;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __job_kind TO job_kind;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __env_id TO env_id;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __schedule_path TO schedule_path;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __permissioned_as TO permissioned_as;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_flow TO raw_flow;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_flow_step TO is_flow_step;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __language TO language;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_skipped TO is_skipped;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_lock TO raw_lock;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __email TO email;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __visible_to_owner TO visible_to_owner;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __tag TO tag;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __priority TO priority;
COMMIT;

START TRANSACTION;
DO $$ BEGIN
LOOP
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_completed') THEN
EXIT;
IF EXISTS (SELECT 1 FROM pg_views WHERE viewname = 'completed_job') THEN
DROP VIEW IF EXISTS completed_job;
ALTER TABLE IF EXISTS v2_job_completed RENAME TO completed_job;
END IF;
ALTER TABLE IF EXISTS v2_job_completed
DROP COLUMN IF EXISTS status CASCADE,
DROP COLUMN IF EXISTS completed_at CASCADE,
DROP COLUMN IF EXISTS worker CASCADE;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN memory_peak TO mem_peak;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __parent_job TO parent_job;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_by TO created_by;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_at TO created_at;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __success TO success;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_hash TO script_hash;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_path TO script_path;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __args TO args;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __deleted TO deleted;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_code TO raw_code;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __canceled TO canceled;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __job_kind TO job_kind;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __env_id TO env_id;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __schedule_path TO schedule_path;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __permissioned_as TO permissioned_as;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_flow TO raw_flow;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_flow_step TO is_flow_step;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __language TO language;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_skipped TO is_skipped;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_lock TO raw_lock;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __email TO email;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __visible_to_owner TO visible_to_owner;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __tag TO tag;
ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __priority TO priority;
DROP VIEW IF EXISTS completed_job;
ALTER TABLE IF EXISTS v2_job_completed RENAME TO completed_job;
EXIT; -- Success, exit the loop
EXCEPTION
WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry';
WHEN others THEN RAISE; -- Re-raise unexpected exceptions
PERFORM pg_sleep(0.5);
END;
END LOOP;
END $$;
COMMIT;

-- 5. `job`
-- 5. `v2_job` -> `job`
START TRANSACTION;
DROP POLICY IF EXISTS see_folder_extra_perms_user ON v2_job;
DROP POLICY IF EXISTS see_own_path ON v2_job;
DROP POLICY IF EXISTS see_member_path ON v2_job;
DROP POLICY IF EXISTS see_own ON v2_job;
DROP POLICY IF EXISTS see_member ON v2_job;
ALTER TABLE IF EXISTS v2_job
DROP COLUMN IF EXISTS created_at CASCADE,
DROP COLUMN IF EXISTS created_by CASCADE,
DROP COLUMN IF EXISTS created_by_email CASCADE,
DROP COLUMN IF EXISTS permissioned_as CASCADE,
DROP COLUMN IF EXISTS kind CASCADE,
DROP COLUMN IF EXISTS entity_id CASCADE,
DROP COLUMN IF EXISTS entity_path CASCADE,
DROP COLUMN IF EXISTS parent_job CASCADE,
DROP COLUMN IF EXISTS script_lang CASCADE,
DROP COLUMN IF EXISTS flow_step CASCADE,
DROP COLUMN IF EXISTS flow_step_id CASCADE,
DROP COLUMN IF EXISTS flow_root_job CASCADE,
DROP COLUMN IF EXISTS schedule_path CASCADE,
DROP COLUMN IF EXISTS same_worker CASCADE,
DROP COLUMN IF EXISTS visible_to_owner CASCADE,
DROP COLUMN IF EXISTS concurrent_limit CASCADE,
DROP COLUMN IF EXISTS concurrency_time_window_s CASCADE,
DROP COLUMN IF EXISTS cache_ttl CASCADE,
DROP COLUMN IF EXISTS timeout CASCADE,
DROP COLUMN IF EXISTS priority CASCADE,
DROP COLUMN IF EXISTS args CASCADE,
DROP COLUMN IF EXISTS pre_run_error CASCADE;
COMMIT;

START TRANSACTION;
DO $$ BEGIN
LOOP
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job') THEN
EXIT;
IF EXISTS (SELECT 1 FROM pg_views WHERE viewname = 'job') THEN
DROP VIEW IF EXISTS job;
ALTER TABLE IF EXISTS v2_job RENAME TO job;
END IF;
DROP POLICY IF EXISTS see_folder_extra_perms_user ON v2_job;
DROP POLICY IF EXISTS see_own_path ON v2_job;
DROP POLICY IF EXISTS see_member_path ON v2_job;
DROP POLICY IF EXISTS see_own ON v2_job;
DROP POLICY IF EXISTS see_member ON v2_job;
ALTER TABLE IF EXISTS v2_job
DROP COLUMN IF EXISTS created_at CASCADE,
DROP COLUMN IF EXISTS created_by CASCADE,
DROP COLUMN IF EXISTS created_by_email CASCADE,
DROP COLUMN IF EXISTS permissioned_as CASCADE,
DROP COLUMN IF EXISTS kind CASCADE,
DROP COLUMN IF EXISTS entity_id CASCADE,
DROP COLUMN IF EXISTS entity_path CASCADE,
DROP COLUMN IF EXISTS parent_job CASCADE,
DROP COLUMN IF EXISTS script_lang CASCADE,
DROP COLUMN IF EXISTS flow_step CASCADE,
DROP COLUMN IF EXISTS flow_step_id CASCADE,
DROP COLUMN IF EXISTS flow_root_job CASCADE,
DROP COLUMN IF EXISTS schedule_path CASCADE,
DROP COLUMN IF EXISTS same_worker CASCADE,
DROP COLUMN IF EXISTS visible_to_owner CASCADE,
DROP COLUMN IF EXISTS concurrent_limit CASCADE,
DROP COLUMN IF EXISTS concurrency_time_window_s CASCADE,
DROP COLUMN IF EXISTS cache_ttl CASCADE,
DROP COLUMN IF EXISTS timeout CASCADE,
DROP COLUMN IF EXISTS priority CASCADE,
DROP COLUMN IF EXISTS args CASCADE,
DROP COLUMN IF EXISTS pre_run_error CASCADE;
DROP VIEW IF EXISTS job;
ALTER TABLE IF EXISTS v2_job RENAME TO job;
EXIT; -- Success, exit the loop
EXCEPTION
WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry';
WHEN others THEN RAISE; -- Re-raise unexpected exceptions
PERFORM pg_sleep(0.5);
END;
END LOOP;
END $$;
COMMIT;

-- 6. Drop new tables
START TRANSACTION;
DROP TYPE IF EXISTS job_status CASCADE;
DROP TABLE IF EXISTS v2_job_runtime CASCADE;
Expand Down
Loading

0 comments on commit 8b1c81c

Please sign in to comment.