Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): Cancel and Start again within 1s caused module not found [v2] #5007

Merged
merged 12 commits into from
Jan 3, 2025
35 changes: 21 additions & 14 deletions backend/windmill-worker/src/python_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use anyhow::anyhow;
use futures::lock::Mutex;
use itertools::Itertools;
use regex::Regex;
use serde_json::value::RawValue;
Expand Down Expand Up @@ -35,6 +36,8 @@ use windmill_common::variables::get_secret_value_as_admin;
use windmill_queue::{append_logs, CanceledBy};

lazy_static::lazy_static! {
static ref BUSY_WITH_UV_INSTALL: Mutex<()> = Mutex::new(());

static ref PYTHON_PATH: String =
std::env::var("PYTHON_PATH").unwrap_or_else(|_| "/usr/local/bin/python3".to_string());

Expand Down Expand Up @@ -1245,7 +1248,6 @@ async fn spawn_uv_install(
"--target",
venv_p,
"--no-cache",
"-q",
]
};

Expand Down Expand Up @@ -1354,6 +1356,7 @@ pub async fn handle_python_reqs(
mut no_uv_install: bool,
is_ansible: bool,
) -> error::Result<Vec<String>> {
let lock = BUSY_WITH_UV_INSTALL.lock().await;
let counter_arc = Arc::new(tokio::sync::Mutex::new(0));
// Append logs with line like this:
// [9/21] + requests==2.32.3 << (S3) | in 57ms
Expand Down Expand Up @@ -1484,7 +1487,6 @@ pub async fn handle_python_reqs(
req.replace(' ', "").replace('/', "").replace(':', "")
);
if metadata(&venv_p).await.is_ok() {
// If dir exists skip installation and push path to output
req_paths.push(venv_p);
in_cache.push(req.to_string());
} else {
Expand Down Expand Up @@ -1518,6 +1520,12 @@ pub async fn handle_python_reqs(
let pids = Arc::new(tokio::sync::Mutex::new(vec![None; total_to_install]));
let mem_peak_thread_safe = Arc::new(tokio::sync::Mutex::new(0));
{
// when we cancel the job, it has up to 1 second window before actually getting cancelled
// Thus the directory with wheel in windmill's cache cleaned only after that.
// If we manage to start new job during that period windmill might see that wanted wheel is already there (because we have not cleaned it yet)
// and write it to installed wheels, meanwhile previous job will clean that wheel.
// To fix that we create lock, which will pipeline all uv installs on worker
let _lock = lock;
let pids = pids.clone();
let mem_peak_thread_safe = mem_peak_thread_safe.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -1760,10 +1768,12 @@ pub async fn handle_python_reqs(
}
};

let mut stderr = uv_install_proccess
let mut stderr_buf = String::new();
let mut stderr_pipe = uv_install_proccess
.stderr
.take()
.ok_or(anyhow!("Cannot take stderr from uv_install_proccess"))?;
let stderr_future = stderr_pipe.read_to_string(&mut stderr_buf);

if let Some(pid) = pids.lock().await.get_mut(i) {
*pid = uv_install_proccess.id();
Expand All @@ -1780,9 +1790,12 @@ pub async fn handle_python_reqs(
uv_install_proccess.kill().await?;
pids.lock().await.get_mut(i).and_then(|e| e.take());
return Err(anyhow::anyhow!("uv pip install was canceled"));
}
// Finished
exitstatus = uv_install_proccess.wait() => match exitstatus {
},
(_, exitstatus) = async {
// See tokio::process::Child::wait_with_output() for more context
// Sometimes uv_install_proccess.wait() is not exiting if stderr is not awaited before it :/
(stderr_future.await, uv_install_proccess.wait().await)
} => match exitstatus {
Ok(status) => if !status.success() {
tracing::warn!(
workspace_id = %w_id,
Expand All @@ -1791,24 +1804,18 @@ pub async fn handle_python_reqs(
status.code()
);

let mut buf = String::new();
stderr.read_to_string(&mut buf).await.unwrap_or_else(|_|{
buf = "Cannot read stderr to string".to_owned();
0
});

append_logs(
&job_id,
w_id,
format!(
"\nError while installing {}:\n{buf}",
"\nError while installing {}:\n{stderr_buf}",
&req
),
db,
)
.await;
pids.lock().await.get_mut(i).and_then(|e| e.take());
return Err(anyhow!(buf));
return Err(anyhow!(stderr_buf));
},
Err(e) => {
tracing::error!(
Expand Down