Skip to content

Commit

Permalink
prevent cancelling schedule next tick + remove need of flow_status in…
Browse files Browse the repository at this point in the history
…side add_completed_job
  • Loading branch information
HugoCasa committed Jan 6, 2025
1 parent ec2ff52 commit 45b845d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 35 additions & 11 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use uuid::Uuid;
use windmill_audit::audit_ee::{audit_log, AuditAuthor};
use windmill_audit::ActionKind;

use windmill_common::utils::now_from_db;
use windmill_common::{
auth::{fetch_authed_from_permissioned_as, permissioned_as_to_username},
cache::{self, FlowData},
Expand Down Expand Up @@ -226,6 +227,20 @@ pub async fn cancel_job<'c>(
}
}

// prevent cancelling a future tick of a schedule
if let Some(schedule_path) = job.schedule_path.as_ref() {
let now = now_from_db(&mut *tx).await?;
if job.scheduled_for > now {
return Err(Error::BadRequest(
format!(
"Cannot cancel a future tick of a schedule, cancel the schedule direcly ({})",
schedule_path
)
.to_string(),
));
}
}

let job = Arc::new(job);

// get all children
Expand Down Expand Up @@ -708,17 +723,26 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
_skip_downstream_error_handlers = schedule.ws_error_handler_muted;
}

// script or flow that failed on start and might not have been rescheduled
// for scripts, always try to schedule next tick
// for flows, only try to schedule next tick here if flow failed and because first handle_flow failed (step = 0, modules[0] = {type: 'Failure', 'job': uuid::nil()}) or job was cancelled before first handle_flow was called (step = 0, modules = [] OR modules[0].type == 'WaitingForPriorSteps')
// otherwise flow rescheduling is done inside handle_flow
let schedule_next_tick = !queued_job.is_flow()
|| {
let flow_status = queued_job.parse_flow_status();
flow_status.is_some_and(|fs| {
fs.step == 0
&& fs.modules.get(0).is_some_and(|m| {
matches!(m, FlowStatusModule::WaitingForPriorSteps { .. }) || matches!(m, FlowStatusModule::Failure { job, ..} if job == &Uuid::nil())
})
})
};
|| !success && sqlx::query_scalar!(
"SELECT
flow_status->>'step' = '0'
AND (
jsonb_array_length(flow_status->'modules') = 0
OR flow_status->'modules'->0->>'type' = 'WaitingForPriorSteps'
OR (
flow_status->'modules'->0->>'type' = 'Failure'
AND flow_status->'modules'->0->>'job' = $1
)
)
FROM completed_job WHERE id = $2 AND workspace_id = $3",
Uuid::nil().to_string(),
&queued_job.id,
&queued_job.workspace_id
).fetch_optional(&mut *tx).await?.flatten().unwrap_or(false);

if schedule_next_tick {
if let Err(err) = handle_maybe_scheduled_job(
Expand Down Expand Up @@ -1310,7 +1334,7 @@ pub async fn handle_maybe_scheduled_job<'c>(
Error::QuotaExceeded(_) => Err(err),
_ => {
report_error_to_workspace_handler_or_critical_side_channel(job, db,
format!("Could not schedule next job for {} and could not disable schedule with err {}. Will retry", schedule.path, disable_err)
format!("Could not schedule next job for {} and could not disable schedule with err {}.", schedule.path, disable_err)
).await;
Err(to_anyhow(disable_err).into())
}
Expand Down

0 comments on commit 45b845d

Please sign in to comment.