From 45b845da1d7afbe2b1a515f52fca8320ab1e1327 Mon Sep 17 00:00:00 2001 From: HugoCasa Date: Mon, 6 Jan 2025 14:14:19 +0100 Subject: [PATCH] prevent cancelling schedule next tick + remove need of flow_status inside add_completed_job --- ...b7527ececad87d218182ff723e6a6c43ecd50.json | 24 ++++++++++ backend/windmill-queue/src/jobs.rs | 46 ++++++++++++++----- 2 files changed, 59 insertions(+), 11 deletions(-) create mode 100644 backend/.sqlx/query-e4e87539ae18f7e5c6bd9a28d16b7527ececad87d218182ff723e6a6c43ecd50.json diff --git a/backend/.sqlx/query-e4e87539ae18f7e5c6bd9a28d16b7527ececad87d218182ff723e6a6c43ecd50.json b/backend/.sqlx/query-e4e87539ae18f7e5c6bd9a28d16b7527ececad87d218182ff723e6a6c43ecd50.json new file mode 100644 index 0000000000000..3148f59bb44f6 --- /dev/null +++ b/backend/.sqlx/query-e4e87539ae18f7e5c6bd9a28d16b7527ececad87d218182ff723e6a6c43ecd50.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n flow_status->>'step' = '0' \n AND (\n jsonb_array_length(flow_status->'modules') = 0 \n OR flow_status->'modules'->0->>'type' = 'WaitingForPriorSteps' \n OR (\n flow_status->'modules'->0->>'type' = 'Failure' \n AND flow_status->'modules'->0->>'job' = $1\n )\n )\n FROM completed_job WHERE id = $2 AND workspace_id = $3", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "e4e87539ae18f7e5c6bd9a28d16b7527ececad87d218182ff723e6a6c43ecd50" +} diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index a006de49411e3..0a847e4b2c6c3 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -26,6 +26,7 @@ use uuid::Uuid; use windmill_audit::audit_ee::{audit_log, AuditAuthor}; use windmill_audit::ActionKind; +use windmill_common::utils::now_from_db; use windmill_common::{ auth::{fetch_authed_from_permissioned_as, permissioned_as_to_username}, cache::{self, FlowData}, @@ -226,6 +227,20 @@ pub async fn cancel_job<'c>( } } + // prevent cancelling a future tick of a schedule + if let Some(schedule_path) = job.schedule_path.as_ref() { + let now = now_from_db(&mut *tx).await?; + if job.scheduled_for > now { + return Err(Error::BadRequest( + format!( + "Cannot cancel a future tick of a schedule, cancel the schedule direcly ({})", + schedule_path + ) + .to_string(), + )); + } + } + let job = Arc::new(job); // get all children @@ -708,17 +723,26 @@ pub async fn add_completed_job( _skip_downstream_error_handlers = schedule.ws_error_handler_muted; } - // script or flow that failed on start and might not have been rescheduled + // for scripts, always try to schedule next tick + // for flows, only try to schedule next tick here if flow failed and because first handle_flow failed (step = 0, modules[0] = {type: 'Failure', 'job': uuid::nil()}) or job was cancelled before first handle_flow was called (step = 0, modules = [] OR modules[0].type == 'WaitingForPriorSteps') + // otherwise flow rescheduling is done inside handle_flow let schedule_next_tick = !queued_job.is_flow() - || { - let flow_status = queued_job.parse_flow_status(); - flow_status.is_some_and(|fs| { - fs.step == 0 - && fs.modules.get(0).is_some_and(|m| { - matches!(m, FlowStatusModule::WaitingForPriorSteps { .. }) || matches!(m, FlowStatusModule::Failure { job, ..} if job == &Uuid::nil()) - }) - }) - }; + || !success && sqlx::query_scalar!( + "SELECT + flow_status->>'step' = '0' + AND ( + jsonb_array_length(flow_status->'modules') = 0 + OR flow_status->'modules'->0->>'type' = 'WaitingForPriorSteps' + OR ( + flow_status->'modules'->0->>'type' = 'Failure' + AND flow_status->'modules'->0->>'job' = $1 + ) + ) + FROM completed_job WHERE id = $2 AND workspace_id = $3", + Uuid::nil().to_string(), + &queued_job.id, + &queued_job.workspace_id + ).fetch_optional(&mut *tx).await?.flatten().unwrap_or(false); if schedule_next_tick { if let Err(err) = handle_maybe_scheduled_job( @@ -1310,7 +1334,7 @@ pub async fn handle_maybe_scheduled_job<'c>( Error::QuotaExceeded(_) => Err(err), _ => { report_error_to_workspace_handler_or_critical_side_channel(job, db, - format!("Could not schedule next job for {} and could not disable schedule with err {}. Will retry", schedule.path, disable_err) + format!("Could not schedule next job for {} and could not disable schedule with err {}.", schedule.path, disable_err) ).await; Err(to_anyhow(disable_err).into()) }