diff --git a/backend/tests/worker.rs b/backend/tests/worker.rs index 559f82ecc77b2..c9a162635b656 100644 --- a/backend/tests/worker.rs +++ b/backend/tests/worker.rs @@ -3848,6 +3848,7 @@ mod job_payload { concurrency_time_window_s: None, cache_ttl: None, dedicated_worker: None, + path: "f/system/hello/test-0".into(), }) .arg("world", json!("foo")) .run_until_complete(&db, port) @@ -3867,6 +3868,7 @@ mod job_payload { concurrency_time_window_s: None, cache_ttl: None, dedicated_worker: None, + path: "f/system/hello/test-0".into(), }) .arg("hello", json!("You know nothing Jean Neige")) .run_until_complete(&db, port) diff --git a/backend/windmill-common/src/jobs.rs b/backend/windmill-common/src/jobs.rs index 086976e701e0f..dc371d319f464 100644 --- a/backend/windmill-common/src/jobs.rs +++ b/backend/windmill-common/src/jobs.rs @@ -281,6 +281,7 @@ pub enum JobPayload { concurrency_time_window_s: Option, cache_ttl: Option, dedicated_worker: Option, + path: String, }, FlowNode { id: FlowNodeId, // flow_node(id). diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 9df0b70d897d8..ffedbda16371a 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -2852,9 +2852,10 @@ pub async fn push<'c, 'd>( concurrency_time_window_s, cache_ttl, dedicated_worker, + path, } => ( Some(id.0), - None, + Some(path), None, JobKind::FlowScript, None, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 7d7ea8d0acd4d..8663da5c38fe7 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -2978,6 +2978,18 @@ fn payload_from_modules<'a>( }) } +fn get_path(flow_job: &QueuedJob, status: &FlowStatus, module: &FlowModule) -> String { + if status + .preprocessor_module + .as_ref() + .is_some_and(|x| x.id() == module.id) + { + format!("{}/preprocessor", flow_job.script_path()) + } else { + format!("{}/step-{}", flow_job.script_path(), status.step) + } +} + async fn compute_next_flow_transform( arc_flow_job_args: Marc>>, arc_last_job_result: Arc>, @@ -3023,6 +3035,7 @@ async fn compute_next_flow_transform( if is_skipped { return trivial_next_job(JobPayload::Identity); } + match module.get_value()? { FlowModuleValue::Identity => trivial_next_job(JobPayload::Identity), FlowModuleValue::Flow { path, .. } => { @@ -3052,19 +3065,10 @@ async fn compute_next_flow_transform( concurrency_time_window_s, .. } => { - let path = path.clone().or_else(|| { - if status - .preprocessor_module - .as_ref() - .is_some_and(|x| x.id() == module.id) - { - Some(format!("{}/preprocessor", flow_job.script_path())) - } else { - Some(format!("{}/step-{}", flow_job.script_path(), status.step)) - } - }); + let path = get_path(flow_job, status, module); + let payload = raw_script_to_payload( - path, + Some(path), content, language, lock, @@ -3089,6 +3093,8 @@ async fn compute_next_flow_transform( concurrency_time_window_s, .. } => { + let path = get_path(flow_job, status, module); + let payload = JobPayloadWithTag { payload: JobPayload::FlowScript { id, @@ -3098,6 +3104,7 @@ async fn compute_next_flow_transform( concurrency_time_window_s, cache_ttl: module.cache_ttl.map(|x| x as i32), dedicated_worker: None, + path, }, tag: tag.clone(), delete_after_use, @@ -3699,6 +3706,8 @@ async fn payload_from_simple_module( concurrency_time_window_s, cache_ttl: module.cache_ttl.map(|x| x as i32), dedicated_worker: None, + path: inner_path + .unwrap_or_else(|| format!("{}/simple-flow", flow_job.script_path())), }, tag, delete_after_use,