diff --git a/backend/windmill-common/src/worker.rs b/backend/windmill-common/src/worker.rs index 1f3dc68411139..9a4064ff1d80c 100644 --- a/backend/windmill-common/src/worker.rs +++ b/backend/windmill-common/src/worker.rs @@ -335,6 +335,8 @@ pub struct PythonAnnotations { pub no_uv: bool, pub no_uv_install: bool, pub no_uv_compile: bool, + + pub no_postinstall: bool, } #[annotations("//")] diff --git a/backend/windmill-worker/src/python_executor.rs b/backend/windmill-worker/src/python_executor.rs index 17feeff648143..aa2efb49afdb0 100644 --- a/backend/windmill-worker/src/python_executor.rs +++ b/backend/windmill-worker/src/python_executor.rs @@ -1,4 +1,9 @@ -use std::{collections::HashMap, fs, process::Stdio}; +use std::{ + collections::{HashMap, HashSet}, + fs, + path::Path, + process::Stdio, +}; use itertools::Itertools; use regex::Regex; @@ -16,7 +21,7 @@ use windmill_common::{ error::{self, Error}, jobs::{QueuedJob, PREPROCESSOR_FAKE_ENTRYPOINT}, utils::calculate_hash, - worker::{write_file, WORKER_CONFIG}, + worker::{write_file, PythonAnnotations, WORKER_CONFIG}, DB, }; @@ -359,6 +364,129 @@ pub async fn uv_pip_compile( Ok(lockfile) } +/** + Iterate over all python paths and if same folder has same name multiple times, + then merge the content and put to /site-packages + + Solves problem with imports for some dependencies. + + Default layout (/windmill/cache/): + + dep==x.y.z + └── X + └── A + dep-ext==x.y.z + └── X + └── B + + In this case python would be confused with finding B module. + + This function will convert it to (/): + + site-packages + └── X + ├── A + └── B + + This way python has no problems with finding correct module +*/ +#[tracing::instrument(level = "trace", skip_all)] +async fn postinstall( + additional_python_paths: &mut Vec, + job_dir: &str, + job: &QueuedJob, + db: &sqlx::Pool, +) -> windmill_common::error::Result<()> { + // It is guranteed that additional_python_paths only contains paths within windmill/cache/ + // All other paths you would usually expect in PYTHONPATH are NOT included. These are added in downstream + // + // > + let mut lookup_table: HashMap> = HashMap::new(); + // e.g.: <"requests", ["/tmp/windmill/cache/python_311/requests==1.0.0"]> + for path in additional_python_paths.iter() { + for entry in fs::read_dir(&path)? { + let entry = entry?; + // Ignore all files, we only need directories. + // We cannot merge files. + if entry.file_type()?.is_dir() { + // Short name, e.g.: requests + let name = entry + .file_name() + .to_str() + .ok_or(anyhow::anyhow!("Cannot convert OsString to String"))? + .to_owned(); + + if name == "bin" || name.contains("dist-info") { + continue; + } + + if let Some(existing_paths) = lookup_table.get_mut(&name) { + tracing::info!( + "Found existing package name: {:?} in {}", + entry.file_name(), + path + ); + existing_paths.push(path.to_owned()) + } else { + lookup_table.insert(name, vec![path.to_owned()]); + } + } + } + } + let mut paths_to_remove: HashSet = HashSet::new(); + // Copy to shared dir + for existing_paths in lookup_table.values() { + if existing_paths.len() == 1 { + // There is only single path for given name + // So we skip it + continue; + } + + for path in existing_paths { + copy_dir_recursively( + Path::new(path), + &std::path::PathBuf::from(job_dir).join("site-packages"), + )?; + paths_to_remove.insert(path.to_owned()); + } + } + + if !paths_to_remove.is_empty() { + append_logs( + &job.id, + &job.workspace_id, + "\n\nCopying some packages from cache to job_dir...\n".to_string(), + db, + ) + .await; + // Remove PATHs we just moved + additional_python_paths.retain(|e| !paths_to_remove.contains(e)); + // Instead add shared path + additional_python_paths.insert(0, format!("{job_dir}/site-packages")); + } + Ok(()) +} + +fn copy_dir_recursively(src: &Path, dst: &Path) -> windmill_common::error::Result<()> { + if !dst.exists() { + fs::create_dir_all(dst)?; + } + + for entry in fs::read_dir(src)? { + let entry = entry?; + let src_path = entry.path(); + let dst_path = dst.join(entry.file_name()); + + if src_path.is_dir() { + copy_dir_recursively(&src_path, &dst_path)?; + } else { + fs::copy(&src_path, &dst_path)?; + } + } + + Ok(()) +} + #[tracing::instrument(level = "trace", skip_all)] pub async fn handle_python_job( requirements_o: Option, @@ -378,7 +506,7 @@ pub async fn handle_python_job( occupancy_metrics: &mut OccupancyMetrics, ) -> windmill_common::error::Result> { let script_path = crate::common::use_flow_root_path(job.script_path()); - let additional_python_paths = handle_python_deps( + let mut additional_python_paths = handle_python_deps( job_dir, requirements_o, inner_content, @@ -394,6 +522,12 @@ pub async fn handle_python_job( ) .await?; + if !PythonAnnotations::parse(inner_content).no_postinstall { + if let Err(e) = postinstall(&mut additional_python_paths, job_dir, job, db).await { + tracing::error!("Postinstall stage has failed. Reason: {e}"); + } + } + append_logs( &job.id, &job.workspace_id,