From 060ff8f750d5d3f1aec3e207883b3b5fdc619cb4 Mon Sep 17 00:00:00 2001 From: Abel Lucas Date: Mon, 6 Jan 2025 11:11:51 +0100 Subject: [PATCH] backend: monitor and push missing schedule next jobs --- backend/src/monitor.rs | 28 ++++++++++++++++++++++++++++ backend/windmill-queue/src/jobs.rs | 15 ++------------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/backend/src/monitor.rs b/backend/src/monitor.rs index 0aeb29df56ea5..57c6e2b5524fe 100644 --- a/backend/src/monitor.rs +++ b/backend/src/monitor.rs @@ -1268,6 +1268,12 @@ pub async fn monitor_db( update_min_version(db).await; }; + let missing_schedule_f = async { + if let Err(err) = missing_schedule(db).await { + tracing::error!("Error checking missing schedule: {:?}", err); + } + }; + join!( expired_items_f, zombie_jobs_f, @@ -1277,6 +1283,7 @@ pub async fn monitor_db( jobs_waiting_alerts_f, apply_autoscaling_f, update_min_worker_version_f, + missing_schedule_f ); } @@ -1823,3 +1830,24 @@ pub async fn reload_jwt_secret_setting(db: &DB) -> error::Result<()> { Ok(()) } + +pub async fn missing_schedule(db: &DB) -> error::Result<()> { + use windmill_common::schedule::Schedule; + use windmill_queue::schedule::push_scheduled_job; + + // find schedules that are enabled but w/o a corresponding job in the queue. + let schedules = sqlx::query_as::<_, Schedule>( + r#"SELECT s.* FROM schedule s WHERE enabled = true AND NOT EXISTS ( + SELECT 1 FROM queue WHERE workspace_id = s.workspace_id AND schedule_path = s.path + )"#, + ) + .fetch_all(db) + .await?; + + let mut tx = db.begin().await?; + for schedule in schedules { + tx = push_scheduled_job(db, tx, &schedule, None).await?; + } + tx.commit().await?; + Ok(()) +} diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index a006de49411e3..0139fa26eaf0e 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -708,19 +708,8 @@ pub async fn add_completed_job( _skip_downstream_error_handlers = schedule.ws_error_handler_muted; } - // script or flow that failed on start and might not have been rescheduled - let schedule_next_tick = !queued_job.is_flow() - || { - let flow_status = queued_job.parse_flow_status(); - flow_status.is_some_and(|fs| { - fs.step == 0 - && fs.modules.get(0).is_some_and(|m| { - matches!(m, FlowStatusModule::WaitingForPriorSteps { .. }) || matches!(m, FlowStatusModule::Failure { job, ..} if job == &Uuid::nil()) - }) - }) - }; - - if schedule_next_tick { + // script only, flow schedules are handled in `handle_flow`. + if !queued_job.is_flow() { if let Err(err) = handle_maybe_scheduled_job( db, queued_job,