diff --git a/backend/migrations/20250107121225_queue_v2.down.sql b/backend/migrations/20250107121225_queue_v2.down.sql index 545b52aa8162e..0acac9885f486 100644 --- a/backend/migrations/20250107121225_queue_v2.down.sql +++ b/backend/migrations/20250107121225_queue_v2.down.sql @@ -1,29 +1,30 @@ --- Add down migration script here +-- no-transaction -- 1. Drop constraints START TRANSACTION; DO $$ BEGIN -LOOP -BEGIN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_queue') THEN - ALTER TABLE v2_job_queue DROP CONSTRAINT IF EXISTS v2_job_queue_id_fkey; + LOCK TABLE v2_job_queue, v2_job_runtime IN ACCESS EXCLUSIVE MODE; + ALTER TABLE IF EXISTS v2_job_runtime DROP CONSTRAINT IF EXISTS v2_job_runtime_id_fkey; END IF; - IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_completed') THEN - ALTER TABLE v2_job_completed DROP CONSTRAINT IF EXISTS v2_job_completed_id_fkey; - ALTER TABLE v2_job_completed ALTER COLUMN status DROP NOT NULL; - ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP NOT NULL; +END $$; +COMMIT; + +START TRANSACTION; +DO $$ BEGIN + IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_queue') THEN + LOCK TABLE v2_job_queue, v2_job_flow_runtime IN ACCESS EXCLUSIVE MODE; + ALTER TABLE IF EXISTS v2_job_flow_runtime DROP CONSTRAINT IF EXISTS v2_job_flow_runtime_id_fkey; END IF; - EXIT; -- Success, exit the loop -EXCEPTION - WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry'; - WHEN others THEN RAISE; -- Re-raise unexpected exceptions - PERFORM pg_sleep(0.5); -END; -END LOOP; END $$; COMMIT; --- 2. Drop all triggers +START TRANSACTION; +ALTER TABLE IF EXISTS v2_job_completed ALTER COLUMN status DROP NOT NULL; +ALTER TABLE IF EXISTS v2_job_completed ALTER COLUMN completed_at DROP NOT NULL; +COMMIT; + +-- 2. Drop triggers DROP FUNCTION IF EXISTS v2_job_queue_after_insert() CASCADE; DROP FUNCTION IF EXISTS v2_job_queue_before_update() CASCADE; DROP FUNCTION IF EXISTS v2_job_completed_before_insert() CASCADE; @@ -32,153 +33,133 @@ DROP FUNCTION IF EXISTS v2_job_runtime_before_update() CASCADE; DROP FUNCTION IF EXISTS v2_job_flow_runtime_before_insert() CASCADE; DROP FUNCTION IF EXISTS v2_job_flow_runtime_before_update() CASCADE; --- 3. `queue` +-- 3. `v2_job_queue` -> `queue` +START TRANSACTION; +ALTER TABLE IF EXISTS v2_job_queue DROP COLUMN IF EXISTS worker; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __parent_job TO parent_job; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __created_by TO created_by; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_hash TO script_hash; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_path TO script_path; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __args TO args; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __logs TO logs; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_code TO raw_code; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __canceled TO canceled; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __last_ping TO last_ping; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __job_kind TO job_kind; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __env_id TO env_id; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __schedule_path TO schedule_path; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __permissioned_as TO permissioned_as; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_status TO flow_status; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_flow TO raw_flow; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __is_flow_step TO is_flow_step; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __language TO language; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __same_worker TO same_worker; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_lock TO raw_lock; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __pre_run_error TO pre_run_error; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __email TO email; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __visible_to_owner TO visible_to_owner; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __mem_peak TO mem_peak; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __root_job TO root_job; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __leaf_jobs TO leaf_jobs; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrent_limit TO concurrent_limit; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrency_time_window_s TO concurrency_time_window_s; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __timeout TO timeout; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_step_id TO flow_step_id; +ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __cache_ttl TO cache_ttl; +COMMIT; + START TRANSACTION; DO $$ BEGIN -LOOP -BEGIN - IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_queue') THEN - EXIT; + IF EXISTS (SELECT 1 FROM pg_views WHERE viewname = 'queue') THEN + DROP VIEW IF EXISTS queue; + ALTER TABLE IF EXISTS v2_job_queue RENAME TO queue; END IF; - ALTER TABLE IF EXISTS v2_job_queue DROP COLUMN IF EXISTS worker; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __parent_job TO parent_job; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __created_by TO created_by; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_hash TO script_hash; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __script_path TO script_path; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __args TO args; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __logs TO logs; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_code TO raw_code; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __canceled TO canceled; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __last_ping TO last_ping; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __job_kind TO job_kind; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __env_id TO env_id; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __schedule_path TO schedule_path; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __permissioned_as TO permissioned_as; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_status TO flow_status; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_flow TO raw_flow; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __is_flow_step TO is_flow_step; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __language TO language; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __same_worker TO same_worker; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __raw_lock TO raw_lock; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __pre_run_error TO pre_run_error; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __email TO email; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __visible_to_owner TO visible_to_owner; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __mem_peak TO mem_peak; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __root_job TO root_job; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __leaf_jobs TO leaf_jobs; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrent_limit TO concurrent_limit; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __concurrency_time_window_s TO concurrency_time_window_s; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __timeout TO timeout; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __flow_step_id TO flow_step_id; - ALTER TABLE IF EXISTS v2_job_queue RENAME COLUMN __cache_ttl TO cache_ttl; - DROP VIEW IF EXISTS queue; - ALTER TABLE IF EXISTS v2_job_queue RENAME TO queue; - EXIT; -- Success, exit the loop -EXCEPTION - WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry'; - WHEN others THEN RAISE; -- Re-raise unexpected exceptions - PERFORM pg_sleep(0.5); -END; -END LOOP; END $$; COMMIT; --- 4. `completed_job` +-- 4. `v2_job_completed` -> `completed_job` +START TRANSACTION; +ALTER TABLE IF EXISTS v2_job_completed + DROP COLUMN IF EXISTS status CASCADE, + DROP COLUMN IF EXISTS completed_at CASCADE, + DROP COLUMN IF EXISTS worker CASCADE; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN memory_peak TO mem_peak; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __parent_job TO parent_job; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_by TO created_by; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_at TO created_at; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __success TO success; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_hash TO script_hash; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_path TO script_path; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __args TO args; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __deleted TO deleted; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_code TO raw_code; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __canceled TO canceled; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __job_kind TO job_kind; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __env_id TO env_id; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __schedule_path TO schedule_path; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __permissioned_as TO permissioned_as; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_flow TO raw_flow; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_flow_step TO is_flow_step; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __language TO language; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_skipped TO is_skipped; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_lock TO raw_lock; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __email TO email; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __visible_to_owner TO visible_to_owner; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __tag TO tag; +ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __priority TO priority; +COMMIT; + START TRANSACTION; DO $$ BEGIN -LOOP -BEGIN - IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_completed') THEN - EXIT; + IF EXISTS (SELECT 1 FROM pg_views WHERE viewname = 'completed_job') THEN + DROP VIEW IF EXISTS completed_job; + ALTER TABLE IF EXISTS v2_job_completed RENAME TO completed_job; END IF; - ALTER TABLE IF EXISTS v2_job_completed - DROP COLUMN IF EXISTS status CASCADE, - DROP COLUMN IF EXISTS completed_at CASCADE, - DROP COLUMN IF EXISTS worker CASCADE; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN memory_peak TO mem_peak; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __parent_job TO parent_job; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_by TO created_by; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __created_at TO created_at; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __success TO success; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_hash TO script_hash; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __script_path TO script_path; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __args TO args; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __deleted TO deleted; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_code TO raw_code; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __canceled TO canceled; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __job_kind TO job_kind; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __env_id TO env_id; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __schedule_path TO schedule_path; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __permissioned_as TO permissioned_as; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_flow TO raw_flow; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_flow_step TO is_flow_step; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __language TO language; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __is_skipped TO is_skipped; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __raw_lock TO raw_lock; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __email TO email; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __visible_to_owner TO visible_to_owner; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __tag TO tag; - ALTER TABLE IF EXISTS v2_job_completed RENAME COLUMN __priority TO priority; - DROP VIEW IF EXISTS completed_job; - ALTER TABLE IF EXISTS v2_job_completed RENAME TO completed_job; - EXIT; -- Success, exit the loop -EXCEPTION - WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry'; - WHEN others THEN RAISE; -- Re-raise unexpected exceptions - PERFORM pg_sleep(0.5); -END; -END LOOP; END $$; COMMIT; --- 5. `job` +-- 5. `v2_job` -> `job` +START TRANSACTION; +DROP POLICY IF EXISTS see_folder_extra_perms_user ON v2_job; +DROP POLICY IF EXISTS see_own_path ON v2_job; +DROP POLICY IF EXISTS see_member_path ON v2_job; +DROP POLICY IF EXISTS see_own ON v2_job; +DROP POLICY IF EXISTS see_member ON v2_job; +ALTER TABLE IF EXISTS v2_job + DROP COLUMN IF EXISTS created_at CASCADE, + DROP COLUMN IF EXISTS created_by CASCADE, + DROP COLUMN IF EXISTS created_by_email CASCADE, + DROP COLUMN IF EXISTS permissioned_as CASCADE, + DROP COLUMN IF EXISTS kind CASCADE, + DROP COLUMN IF EXISTS entity_id CASCADE, + DROP COLUMN IF EXISTS entity_path CASCADE, + DROP COLUMN IF EXISTS parent_job CASCADE, + DROP COLUMN IF EXISTS script_lang CASCADE, + DROP COLUMN IF EXISTS flow_step CASCADE, + DROP COLUMN IF EXISTS flow_step_id CASCADE, + DROP COLUMN IF EXISTS flow_root_job CASCADE, + DROP COLUMN IF EXISTS schedule_path CASCADE, + DROP COLUMN IF EXISTS same_worker CASCADE, + DROP COLUMN IF EXISTS visible_to_owner CASCADE, + DROP COLUMN IF EXISTS concurrent_limit CASCADE, + DROP COLUMN IF EXISTS concurrency_time_window_s CASCADE, + DROP COLUMN IF EXISTS cache_ttl CASCADE, + DROP COLUMN IF EXISTS timeout CASCADE, + DROP COLUMN IF EXISTS priority CASCADE, + DROP COLUMN IF EXISTS args CASCADE, + DROP COLUMN IF EXISTS pre_run_error CASCADE; +COMMIT; + START TRANSACTION; DO $$ BEGIN -LOOP -BEGIN - IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job') THEN - EXIT; + IF EXISTS (SELECT 1 FROM pg_views WHERE viewname = 'job') THEN + DROP VIEW IF EXISTS job; + ALTER TABLE IF EXISTS v2_job RENAME TO job; END IF; - DROP POLICY IF EXISTS see_folder_extra_perms_user ON v2_job; - DROP POLICY IF EXISTS see_own_path ON v2_job; - DROP POLICY IF EXISTS see_member_path ON v2_job; - DROP POLICY IF EXISTS see_own ON v2_job; - DROP POLICY IF EXISTS see_member ON v2_job; - ALTER TABLE IF EXISTS v2_job - DROP COLUMN IF EXISTS created_at CASCADE, - DROP COLUMN IF EXISTS created_by CASCADE, - DROP COLUMN IF EXISTS created_by_email CASCADE, - DROP COLUMN IF EXISTS permissioned_as CASCADE, - DROP COLUMN IF EXISTS kind CASCADE, - DROP COLUMN IF EXISTS entity_id CASCADE, - DROP COLUMN IF EXISTS entity_path CASCADE, - DROP COLUMN IF EXISTS parent_job CASCADE, - DROP COLUMN IF EXISTS script_lang CASCADE, - DROP COLUMN IF EXISTS flow_step CASCADE, - DROP COLUMN IF EXISTS flow_step_id CASCADE, - DROP COLUMN IF EXISTS flow_root_job CASCADE, - DROP COLUMN IF EXISTS schedule_path CASCADE, - DROP COLUMN IF EXISTS same_worker CASCADE, - DROP COLUMN IF EXISTS visible_to_owner CASCADE, - DROP COLUMN IF EXISTS concurrent_limit CASCADE, - DROP COLUMN IF EXISTS concurrency_time_window_s CASCADE, - DROP COLUMN IF EXISTS cache_ttl CASCADE, - DROP COLUMN IF EXISTS timeout CASCADE, - DROP COLUMN IF EXISTS priority CASCADE, - DROP COLUMN IF EXISTS args CASCADE, - DROP COLUMN IF EXISTS pre_run_error CASCADE; - DROP VIEW IF EXISTS job; - ALTER TABLE IF EXISTS v2_job RENAME TO job; - EXIT; -- Success, exit the loop -EXCEPTION - WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry'; - WHEN others THEN RAISE; -- Re-raise unexpected exceptions - PERFORM pg_sleep(0.5); -END; -END LOOP; END $$; COMMIT; +-- 6. Drop new tables START TRANSACTION; DROP TYPE IF EXISTS job_status CASCADE; DROP TABLE IF EXISTS v2_job_runtime CASCADE; diff --git a/backend/migrations/20250107121225_queue_v2.up.sql b/backend/migrations/20250107121225_queue_v2.up.sql index e89ebce93966a..1c1f418328ab9 100644 --- a/backend/migrations/20250107121225_queue_v2.up.sql +++ b/backend/migrations/20250107121225_queue_v2.up.sql @@ -1,4 +1,14 @@ --- Add up migration script here +-- no-transaction + +-- TODO(uael): REMOVE!!!!!!!!! +-- DO $$ BEGIN +-- IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'completed_job') THEN +-- DELETE FROM completed_job WHERE created_at < now() - interval '5 minutes'; +-- DELETE FROM job WHERE NOT EXISTS (SELECT 1 FROM completed_job WHERE completed_job.id = job.id); +-- END IF; +-- END $$; + +START TRANSACTION; CREATE TABLE IF NOT EXISTS v2_job_runtime ( id UUID REFERENCES queue (id) ON DELETE CASCADE PRIMARY KEY NOT NULL, -- Metrics fields: @@ -44,6 +54,7 @@ DO $$ BEGIN EXCEPTION WHEN duplicate_object THEN null; END $$; +COMMIT; -- 1. v1 -> v2 synchronization, in one transaction: -- 1.1. `queue` to `v2_job_queue` @@ -92,12 +103,11 @@ END $$; * ); */ START TRANSACTION; -DO $$ BEGIN -LOOP -BEGIN +DO $tx$ BEGIN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_queue') THEN - EXIT; -- Success, exit the loop + RETURN; END IF; + -- 1.1.1. Add new columns to `queue` ALTER TABLE queue ADD COLUMN IF NOT EXISTS worker VARCHAR(255); @@ -133,13 +143,10 @@ BEGIN ALTER TABLE queue RENAME COLUMN flow_step_id TO __flow_step_id; ALTER TABLE queue RENAME COLUMN cache_ttl TO __cache_ttl; - -- 1.1.3. Alter columns in `queue` - ALTER TABLE queue ADD CONSTRAINT v2_job_queue_id_fkey FOREIGN KEY (id) REFERENCES job (id) ON DELETE CASCADE; - - -- 1.1.4. Rename `queue` to `v2_job_queue` + -- 1.1.3. Rename `queue` to `v2_job_queue` ALTER TABLE queue RENAME TO v2_job_queue; - -- 1.1.5. Create a `queue` view for backward compatibility + -- 1.1.4. Create a `queue` view for backward compatibility CREATE OR REPLACE VIEW queue AS ( SELECT id, @@ -186,14 +193,7 @@ BEGIN priority FROM v2_job_queue ); - EXIT; -- Success, exit the loop -EXCEPTION - WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry'; - WHEN others THEN RAISE; -- Re-raise unexpected exceptions - PERFORM pg_sleep(0.5); -END; -END LOOP; -END $$; +END $tx$; COMMIT; /** @@ -220,11 +220,9 @@ COMMIT; * ); */ START TRANSACTION; -DO $$ BEGIN -LOOP -BEGIN +DO $tx$ BEGIN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job_completed') THEN - EXIT; -- Success, exit the loop + RETURN; END IF; -- 1.2.1. Add new columns to `completed_job` @@ -263,7 +261,6 @@ BEGIN -- 1.2.4. Alter columns in `completed_job` ALTER TABLE completed_job ALTER COLUMN started_at DROP NOT NULL; - ALTER TABLE completed_job ADD CONSTRAINT v2_job_completed_id_fkey FOREIGN KEY (id) REFERENCES job (id) ON DELETE CASCADE; -- 1.2.5. Rename `completed_job` to `v2_job_completed` ALTER TABLE completed_job RENAME TO v2_job_completed; @@ -306,14 +303,7 @@ BEGIN __priority AS priority FROM v2_job_completed ); - EXIT; -- Success, exit the loop -EXCEPTION - WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry'; - WHEN others THEN RAISE; -- Re-raise unexpected exceptions - PERFORM pg_sleep(0.5); -END; -END LOOP; -END $$; +END $tx$; COMMIT; /** @@ -369,11 +359,9 @@ COMMIT; * ); */ START TRANSACTION; -DO $$ BEGIN -LOOP -BEGIN +DO $tx$ BEGIN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'v2_job') THEN - EXIT; -- Success, exit the loop + RETURN; END IF; -- 1.3.1. Add new columns to `job` @@ -453,14 +441,7 @@ BEGIN GRANT DELETE, INSERT, REFERENCES, SELECT, TRIGGER, TRUNCATE, UPDATE ON v2_job TO windmill_user; GRANT DELETE, INSERT, REFERENCES, SELECT, TRIGGER, TRUNCATE, UPDATE ON v2_job TO windmill_admin; - EXIT; -- Success, exit the loop -EXCEPTION - WHEN deadlock_detected THEN RAISE NOTICE 'Deadlock detected. Retry'; - WHEN others THEN RAISE; -- Re-raise unexpected exceptions - PERFORM pg_sleep(0.5); -END; -END LOOP; -END $$; +END $tx$; COMMIT; -- 1.4. On every insert to `v2_job_queue`, insert to `v2_job`, `v2_job_runtime` and `v2_job_flow_runtime` as well @@ -618,9 +599,11 @@ SET completed_at = started_at + (interval '1 millisecond' * duration_ms), WHERE status IS NULL; -- Set `status` and `completed_at` as non null +START TRANSACTION; ALTER TABLE v2_job_completed ALTER COLUMN status SET NOT NULL; ALTER TABLE v2_job_completed ALTER COLUMN completed_at SET DEFAULT now(); ALTER TABLE v2_job_completed ALTER COLUMN completed_at SET NOT NULL; +COMMIT; -- 2.3. Migrate `v2_job_queue` old columns to `v2_job` INSERT INTO v2_job ( @@ -721,8 +704,10 @@ WHERE v2_job.id = v2_job_completed.id AND v2_job.id IN ( WHERE v2_job.created_by = 'missing' ); +START TRANSACTION; +ALTER TABLE v2_job ALTER COLUMN tag SET DEFAULT 'other'; ALTER TABLE v2_job ALTER COLUMN tag SET NOT NULL; - +COMMIT; -- 2.3. Migrate `v2_job_queue` old columns to `v2_job_runtime` START TRANSACTION; @@ -731,7 +716,8 @@ INSERT INTO v2_job_runtime ( ) SELECT id, __last_ping, __mem_peak FROM v2_job_queue -WHERE __last_ping IS NOT NULL OR __mem_peak IS NOT NULL +WHERE NOT running AND __last_ping IS NOT NULL OR __mem_peak IS NOT NULL +FOR UPDATE SKIP LOCKED ON CONFLICT (id) DO NOTHING; COMMIT; @@ -742,7 +728,8 @@ INSERT INTO v2_job_flow_runtime ( ) SELECT id, __flow_status, __leaf_jobs FROM v2_job_queue -WHERE __flow_status IS NOT NULL OR __leaf_jobs IS NOT NULL +WHERE NOT running AND __flow_status IS NOT NULL OR __leaf_jobs IS NOT NULL +FOR UPDATE SKIP LOCKED ON CONFLICT (id) DO NOTHING; COMMIT;