diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index bd5caa8e5b557..94579a717bf41 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -12918,6 +12918,8 @@ components: execution_mode: type: string enum: [viewer, publisher, anonymous] + raw_app: + type: boolean required: - id - workspace_id diff --git a/backend/windmill-api/src/apps.rs b/backend/windmill-api/src/apps.rs index a0bbfca3ef994..5275d60b884e2 100644 --- a/backend/windmill-api/src/apps.rs +++ b/backend/windmill-api/src/apps.rs @@ -79,6 +79,7 @@ pub fn workspaced_service() -> Router { .route("/get/draft/*path", get(get_app_w_draft)) .route("/secret_of/*path", get(get_secret_id)) .route("/get/v/*id", get(get_app_by_id)) + .route("/get_data/v/*id", get(get_raw_app_data)) .route("/exists/*path", get(exists_app)) .route("/update/*path", post(update_app)) .route("/delete/*path", delete(delete_app)) @@ -372,36 +373,43 @@ async fn list_apps( Ok(Json(rows)) } -async fn get_raw_app_data(Path((w_id, version_id)): Path<(String, i64)>) -> Result { +async fn get_raw_app_data(Path((w_id, version_id)): Path<(String, String)>) -> Result { let file_path = format!("/home/rfiszel/wmill/{}/{}", w_id, version_id); let file = tokio::fs::File::open(file_path).await?; let stream = tokio_util::io::ReaderStream::new(file); - let res = Response::builder().header(http::header::CONTENT_TYPE, "text/javascript"); + let res = Response::builder().header( + http::header::CONTENT_TYPE, + if version_id.ends_with(".css") { + "text/css" + } else { + "text/javascript" + }, + ); Ok(res.body(Body::from_stream(stream)).unwrap()) } -async fn get_app_version( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult { - let path = path.to_path(); - let mut tx = user_db.begin(&authed).await?; - - let version_o = sqlx::query_scalar!( - "SELECT app.versions[array_upper(app.versions, 1)] as version FROM app - WHERE app.path = $1 AND app.workspace_id = $2", - path, - &w_id, - ) - .fetch_optional(&mut *tx) - .await? - .flatten(); - tx.commit().await?; - - let version = not_found_if_none(version_o, "App", path)?; - Ok(Json(version)) -} +// async fn get_app_version( +// authed: ApiAuthed, +// Extension(user_db): Extension, +// Path((w_id, path)): Path<(String, StripPath)>, +// ) -> JsonResult { +// let path = path.to_path(); +// let mut tx = user_db.begin(&authed).await?; + +// let version_o = sqlx::query_scalar!( +// "SELECT app.versions[array_upper(app.versions, 1)] as version FROM app +// WHERE app.path = $1 AND app.workspace_id = $2", +// path, +// &w_id, +// ) +// .fetch_optional(&mut *tx) +// .await? +// .flatten(); +// tx.commit().await?; + +// let version = not_found_if_none(version_o, "App", path)?; +// Ok(Json(version)) +// } async fn get_app( authed: ApiAuthed, diff --git a/backend/windmill-api/src/lib.rs b/backend/windmill-api/src/lib.rs index 7f5fd67a7f368..75d2a3fa303d2 100644 --- a/backend/windmill-api/src/lib.rs +++ b/backend/windmill-api/src/lib.rs @@ -86,7 +86,6 @@ mod kafka_triggers_ee; pub mod oauth2_ee; mod oidc_ee; mod raw_apps; -mod raw_apps_v2; mod resources; mod saml_ee; mod schedule; diff --git a/backend/windmill-api/src/raw_apps_v2.rs b/backend/windmill-api/src/raw_apps_v2.rs deleted file mode 100644 index a311a54af43d6..0000000000000 --- a/backend/windmill-api/src/raw_apps_v2.rs +++ /dev/null @@ -1,1893 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -/* - * Author: Ruben Fiszel - * Copyright: Windmill Labs, Inc 2022 - * This file and its contents are licensed under the AGPLv3 License. - * Please see the included NOTICE for copyright information and - * LICENSE-AGPL for a copy of the license. - */ - -use crate::{ - db::{ApiAuthed, DB}, - resources::get_resource_value_interpolated_internal, - users::{require_owner_of_path, OptAuthed}, - utils::WithStarredInfoQuery, - variables::encrypt, - webhook_util::{WebhookMessage, WebhookShared}, - HTTP_CLIENT, -}; -#[cfg(feature = "parquet")] -use crate::{ - job_helpers_ee::{ - get_random_file_name, get_s3_resource, get_workspace_s3_resource, upload_file_from_req, - UploadFileResponse, - }, - users::fetch_api_authed_from_permissioned_as, -}; -use axum::{ - extract::{Extension, Json, Path, Query}, - response::IntoResponse, - routing::{delete, get, post}, - Router, -}; -use futures::future::{FutureExt, TryFutureExt}; -use hyper::StatusCode; -#[cfg(feature = "parquet")] -use itertools::Itertools; -use magic_crypt::MagicCryptTrait; -#[cfg(feature = "parquet")] -use object_store::{Attribute, Attributes}; -#[cfg(feature = "parquet")] -use regex::Regex; -use serde::{Deserialize, Serialize}; -use serde_json::{json, value::RawValue}; -use sha2::{Digest, Sha256}; -use sql_builder::{bind::Bind, SqlBuilder}; -use sqlx::{types::Uuid, FromRow}; -use std::str; -use windmill_audit::audit_ee::audit_log; -use windmill_audit::ActionKind; -#[cfg(feature = "parquet")] -use windmill_common::s3_helpers::build_object_store_client; -use windmill_common::{ - apps::{AppScriptId, ListAppQuery}, - cache::{self, future::FutureCachedExt}, - db::UserDB, - error::{to_anyhow, Error, JsonResult, Result}, - jobs::{get_payload_tag_from_prefixed_path, JobPayload, RawCode}, - users::username_to_permissioned_as, - utils::{ - http_get_from_hub, not_found_if_none, paginate, query_elems_from_hub, require_admin, - Pagination, StripPath, - }, - variables::{build_crypt, build_crypt_with_key_suffix}, - worker::{to_raw_value, CLOUD_HOSTED}, - HUB_BASE_URL, -}; - -use windmill_git_sync::{handle_deployment_metadata, DeployedObject}; -use windmill_queue::{push, PushArgs, PushArgsOwned, PushIsolationLevel}; - -pub fn workspaced_service() -> Router { - Router::new() - .route("/list", get(list_apps)) - .route("/list_search", get(list_search_apps)) - .route("/get/p/*path", get(get_app)) - .route("/get/lite/*path", get(get_app_lite)) - .route("/get/draft/*path", get(get_app_w_draft)) - .route("/secret_of/*path", get(get_secret_id)) - .route("/get/v/*id", get(get_app_by_id)) - .route("/exists/*path", get(exists_app)) - .route("/update/*path", post(update_app)) - .route("/delete/*path", delete(delete_app)) - .route("/create", post(create_app)) - .route("/history/p/*path", get(get_app_history)) - .route("/get_latest_version/*path", get(get_latest_version)) - .route("/history_update/a/:id/v/:version", post(update_app_history)) - .route("/custom_path_exists/*custom_path", get(custom_path_exists)) -} - -pub fn unauthed_service() -> Router { - Router::new() - .route("/execute_component/*path", post(execute_component)) - .route("/upload_s3_file/*path", post(upload_s3_file_from_app)) - .route("/public_app/:secret", get(get_public_app_by_secret)) - .route("/public_resource/*path", get(get_public_resource)) -} -// pub fn global_service() -> Router { -// Router::new() -// .route("/hub/list", get(list_hub_apps)) -// .route("/hub/get/:id", get(get_hub_app_by_id)) -// } - -#[derive(FromRow, Deserialize, Serialize)] -pub struct ListableApp { - pub id: i64, - pub workspace_id: String, - pub path: String, - pub summary: String, - pub version: i64, - pub extra_perms: serde_json::Value, - pub execution_mode: String, - pub starred: bool, - pub edited_at: Option>, - pub has_draft: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub draft_only: Option, - #[sqlx(default)] - #[serde(skip_serializing_if = "Option::is_none")] - pub deployment_msg: Option, -} - -#[derive(FromRow, Serialize, Deserialize)] -pub struct AppVersion { - pub id: i64, - pub app_id: Uuid, - pub value: sqlx::types::Json>, - pub created_by: String, - pub created_at: chrono::DateTime, -} - -#[derive(Serialize, Deserialize, FromRow)] -pub struct AppWithLastVersion { - pub id: i64, - pub path: String, - pub summary: String, - pub policy: sqlx::types::Json>, - pub versions: Vec, - pub value: sqlx::types::Json>, - pub created_by: String, - pub created_at: chrono::DateTime, - pub extra_perms: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub custom_path: Option, -} - -#[derive(Serialize, FromRow)] -pub struct AppWithLastVersionAndStarred { - #[sqlx(flatten)] - #[serde(flatten)] - pub app: AppWithLastVersion, - #[serde(skip_serializing_if = "Option::is_none")] - pub starred: Option, -} - -#[cfg(feature = "enterprise")] -#[derive(Serialize, FromRow)] -pub struct AppWithLastVersionAndWorkspace { - #[sqlx(flatten)] - #[serde(flatten)] - pub app: AppWithLastVersion, - pub workspace_id: String, -} - -#[derive(Serialize, Deserialize, FromRow)] -pub struct AppWithLastVersionAndDraft { - #[sqlx(flatten)] - #[serde(flatten)] - pub app: AppWithLastVersion, - #[serde(skip_serializing_if = "Option::is_none")] - pub draft: Option>>, - #[serde(skip_serializing_if = "Option::is_none")] - pub draft_only: Option, -} - -#[derive(Serialize)] -pub struct AppHistory { - pub app_id: i64, - pub version: i64, - #[serde(skip_serializing_if = "Option::is_none")] - pub deployment_msg: Option, -} - -#[derive(Deserialize)] -pub struct AppHistoryUpdate { - pub deployment_msg: Option, -} - -pub type StaticFields = HashMap>; -pub type OneOfFields = HashMap>>; -pub type AllowUserResources = Vec; - -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] -#[serde(rename_all = "lowercase")] -pub enum ExecutionMode { - #[default] - Anonymous, - Publisher, - Viewer, -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default)] -pub struct PolicyTriggerableInputs { - static_inputs: StaticFields, - one_of_inputs: OneOfFields, - #[serde(default)] - allow_user_resources: AllowUserResources, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct S3Input { - allowed_resources: Vec, - allow_user_resources: bool, - allow_workspace_resource: bool, - file_key_regex: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default)] -pub struct Policy { - pub on_behalf_of: Option, - pub on_behalf_of_email: Option, - //paths: - // - script/ - // - flow/ - // - rawscript/ - #[serde(skip_serializing_if = "Option::is_none")] - pub triggerables: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub triggerables_v2: Option>, - pub execution_mode: ExecutionMode, - pub s3_inputs: Option>, -} - -#[derive(Deserialize)] -pub struct CreateApp { - pub path: String, - pub summary: String, - pub value: sqlx::types::Json>, - pub policy: Policy, - pub draft_only: Option, - pub deployment_message: Option, - pub custom_path: Option, -} - -#[derive(Deserialize)] -pub struct EditApp { - pub path: Option, - pub summary: Option, - pub value: Option>>, - pub policy: Option, - pub deployment_message: Option, - pub custom_path: Option, -} - -#[derive(Serialize, FromRow)] -pub struct SearchApp { - path: String, - value: sqlx::types::Json>, -} -async fn list_search_apps( - authed: ApiAuthed, - Path(w_id): Path, - Extension(user_db): Extension, -) -> JsonResult> { - #[cfg(feature = "enterprise")] - let n = 1000; - - #[cfg(not(feature = "enterprise"))] - let n = 3; - let mut tx = user_db.begin(&authed).await?; - - let rows = sqlx::query_as::<_, SearchApp>( - "SELECT path, app_version.value from app LEFT JOIN app_version ON app_version.id = versions[array_upper(versions, 1)] WHERE workspace_id = $1 LIMIT $2", - ) - .bind(&w_id) - .bind(n) - .fetch_all(&mut *tx) - .await? - .into_iter() - .collect::>(); - tx.commit().await?; - Ok(Json(rows)) -} - -async fn list_apps( - authed: ApiAuthed, - Extension(user_db): Extension, - Path(w_id): Path, - Query(pagination): Query, - Query(lq): Query, -) -> JsonResult> { - let (per_page, offset) = paginate(pagination); - - let mut sqlb = SqlBuilder::select_from("app") - .fields(&[ - "app.id", - "app.workspace_id", - "app.path", - "app.summary", - "app.versions[array_upper(app.versions, 1)] as version", - "app.policy->>'execution_mode' as execution_mode", - "app_version.created_at as edited_at", - "app.extra_perms", - "favorite.path IS NOT NULL as starred", - "draft.path IS NOT NULL as has_draft", - "draft_only" - ]) - .left() - .join("favorite") - .on( - "favorite.favorite_kind = 'app' AND favorite.workspace_id = app.workspace_id AND favorite.path = app.path AND favorite.usr = ?" - .bind(&authed.username), - ) - .left() - .join("app_version") - .on( - "app_version.id = versions[array_upper(versions, 1)]" - ) - .left() - .join("draft") - .on( - "draft.path = app.path AND draft.workspace_id = app.workspace_id AND draft.typ = 'app'" - ) - .order_desc("favorite.path IS NOT NULL") - .order_by("app_version.created_at", true) - .and_where("app.workspace_id = ?".bind(&w_id)) - .offset(offset) - .limit(per_page) - .clone(); - - if lq.starred_only.unwrap_or(false) { - sqlb.and_where_is_not_null("favorite.path"); - } - - if let Some(path_start) = &lq.path_start { - sqlb.and_where_like_left("app.path", path_start); - } - - if let Some(path_exact) = &lq.path_exact { - sqlb.and_where_eq("app.path", "?".bind(path_exact)); - } - - if !lq.include_draft_only.unwrap_or(false) || authed.is_operator { - sqlb.and_where("app.draft_only IS NOT TRUE"); - } - - if lq.with_deployment_msg.unwrap_or(false) { - sqlb.join("deployment_metadata dm") - .left() - .on("dm.app_version = app.versions[array_upper(app.versions, 1)]") - .fields(&["dm.deployment_msg"]); - } - - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; - let mut tx = user_db.begin(&authed).await?; - let rows = sqlx::query_as::<_, ListableApp>(&sql) - .fetch_all(&mut *tx) - .await?; - - tx.commit().await?; - - Ok(Json(rows)) -} - -async fn get_app( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, - Query(query): Query, -) -> JsonResult { - let path = path.to_path(); - let mut tx = user_db.begin(&authed).await?; - - let app_o = if query.with_starred_info.unwrap_or(false) { - sqlx::query_as::<_, AppWithLastVersionAndStarred>( - "SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, - app.extra_perms, app_version.value, - app_version.created_at, app_version.created_by, favorite.path IS NOT NULL as starred - FROM app - JOIN app_version - ON app_version.id = app.versions[array_upper(app.versions, 1)] - LEFT JOIN favorite - ON favorite.favorite_kind = 'app' - AND favorite.workspace_id = app.workspace_id - AND favorite.path = app.path - AND favorite.usr = $3 - WHERE app.path = $1 AND app.workspace_id = $2", - ) - .bind(path.to_owned()) - .bind(&w_id) - .bind(&authed.username) - .fetch_optional(&mut *tx) - .await? - } else { - sqlx::query_as::<_, AppWithLastVersionAndStarred>( - "SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, - app.extra_perms, app_version.value, - app_version.created_at, app_version.created_by, NULL as starred - FROM app, app_version - WHERE app.path = $1 AND app.workspace_id = $2 AND app_version.id = app.versions[array_upper(app.versions, 1)]", - ) - .bind(path.to_owned()) - .bind(&w_id) - .fetch_optional(&mut *tx) - .await? - }; - tx.commit().await?; - - let app = not_found_if_none(app_o, "App", path)?; - Ok(Json(app)) -} - -async fn get_app_lite( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult { - let path = path.to_path(); - let mut tx = user_db.begin(&authed).await?; - - let app_o = sqlx::query_as::<_, AppWithLastVersion>( - "SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, - app.extra_perms, coalesce(app_version_lite.value::json, app_version.value) as value, - app_version.created_at, app_version.created_by, NULL as starred - FROM app, app_version - LEFT JOIN app_version_lite ON app_version_lite.id = app_version.id - WHERE app.path = $1 AND app.workspace_id = $2 AND app_version.id = app.versions[array_upper(app.versions, 1)]", - ) - .bind(path.to_owned()) - .bind(&w_id) - .fetch_optional(&mut *tx) - .await?; - - tx.commit().await?; - - let app = not_found_if_none(app_o, "App", path)?; - Ok(Json(app)) -} - -async fn get_app_w_draft( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult { - let path = path.to_path(); - let mut tx = user_db.begin(&authed).await?; - - let app_o = sqlx::query_as::<_, AppWithLastVersionAndDraft>( - r#"SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, - app.extra_perms, app_version.value, - app_version.created_at, app_version.created_by, - app.draft_only, draft.value as "draft" - from app - INNER JOIN app_version ON - app_version.id = app.versions[array_upper(app.versions, 1)] - LEFT JOIN draft ON - app.path = draft.path AND draft.workspace_id = $2 AND draft.typ = 'app' - WHERE app.path = $1 AND app.workspace_id = $2"#, - ) - .bind(path.to_owned()) - .bind(&w_id) - .fetch_optional(&mut *tx) - .await?; - tx.commit().await?; - - let app = not_found_if_none(app_o, "App", path)?; - Ok(Json(app)) -} - -async fn get_app_history( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult> { - let mut tx = user_db.begin(&authed).await?; - let query_result = sqlx::query!( - "SELECT a.id as app_id, av.id as version_id, dm.deployment_msg as deployment_msg - FROM app a LEFT JOIN app_version av ON a.id = av.app_id LEFT JOIN deployment_metadata dm ON av.id = dm.app_version - WHERE a.workspace_id = $1 AND a.path = $2 - ORDER BY created_at DESC", - w_id, - path.to_path(), - ).fetch_all(&mut *tx).await?; - tx.commit().await?; - - let result: Vec = query_result - .into_iter() - .map(|row| AppHistory { - app_id: row.app_id, - version: row.version_id, - deployment_msg: row.deployment_msg, - }) - .collect(); - return Ok(Json(result)); -} - -async fn get_latest_version( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult> { - let mut tx = user_db.begin(&authed).await?; - let row = sqlx::query!( - "SELECT a.id as app_id, av.id as version_id, dm.deployment_msg as deployment_msg - FROM app a LEFT JOIN app_version av ON a.id = av.app_id LEFT JOIN deployment_metadata dm ON av.id = dm.app_version - WHERE a.workspace_id = $1 AND a.path = $2 - ORDER BY created_at DESC", - w_id, - path.to_path(), - ).fetch_optional(&mut *tx).await?; - tx.commit().await?; - - if let Some(row) = row { - let result = AppHistory { - app_id: row.app_id, - version: row.version_id, - deployment_msg: row.deployment_msg, - }; - - return Ok(Json(Some(result))); - } else { - return Ok(Json(None)); - } -} - -async fn update_app_history( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, app_id, app_version)): Path<(String, i64, i64)>, - Json(app_history_update): Json, -) -> Result<()> { - let mut tx = user_db.begin(&authed).await?; - let app_path = sqlx::query_scalar!("SELECT path FROM app WHERE id = $1", app_id) - .fetch_optional(&mut *tx) - .await?; - - if app_path.is_none() { - tx.commit().await?; - return Err(Error::NotFound( - format!("App with ID {app_id} not found").to_string(), - )); - } - - sqlx::query!( - "INSERT INTO deployment_metadata (workspace_id, path, app_version, deployment_msg) VALUES ($1, $2, $3, $4) ON CONFLICT (workspace_id, path, app_version) WHERE app_version IS NOT NULL DO UPDATE SET deployment_msg = $4", - w_id, - app_path.unwrap(), - app_version, - app_history_update.deployment_msg, - ) - .fetch_optional(&mut *tx) - .await?; - tx.commit().await?; - return Ok(()); -} - -async fn custom_path_exists( - Extension(db): Extension, - Path((w_id, custom_path)): Path<(String, String)>, -) -> JsonResult { - let exists = - sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM app WHERE custom_path = $1 AND ($2::TEXT IS NULL OR workspace_id = $2))", - custom_path, - if *CLOUD_HOSTED { Some(&w_id) } else { None } - ) - .fetch_one(&db) - .await?.unwrap_or(false); - Ok(Json(exists)) -} - -async fn get_app_by_id( - authed: ApiAuthed, - Extension(user_db): Extension, - Path((w_id, id)): Path<(String, i64)>, -) -> JsonResult { - let mut tx = user_db.begin(&authed).await?; - - let app_o = sqlx::query_as::<_, AppWithLastVersion>( - "SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, - app.extra_perms, app_version.value, - app_version.created_at, app_version.created_by from app, app_version - WHERE app_version.id = $1 AND app.id = app_version.app_id AND app.workspace_id = $2", - ) - .bind(&id) - .bind(&w_id) - .fetch_optional(&mut *tx) - .await?; - tx.commit().await?; - - let app = not_found_if_none(app_o, "App", id.to_string())?; - Ok(Json(app)) -} - -async fn get_public_app_by_secret( - OptAuthed(opt_authed): OptAuthed, - Extension(user_db): Extension, - Extension(db): Extension, - Path((w_id, secret)): Path<(String, String)>, -) -> JsonResult { - let mc = build_crypt(&db, &w_id).await?; - - let decrypted = mc - .decrypt_bytes_to_bytes(&(hex::decode(secret)?)) - .map_err(|e| Error::InternalErr(e.to_string()))?; - let bytes = str::from_utf8(&decrypted).map_err(to_anyhow)?; - - let id: i64 = bytes.parse().map_err(to_anyhow)?; - - let app_o = sqlx::query_as::<_, AppWithLastVersion>( - "SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, - null as extra_perms, coalesce(app_version_lite.value::json, app_version.value::json) as value, - app_version.created_at, app_version.created_by from app, app_version - LEFT JOIN app_version_lite ON app_version_lite.id = app_version.id - WHERE app.id = $1 AND app.workspace_id = $2 AND app_version.id = app.versions[array_upper(app.versions, 1)]") - .bind(&id) - .bind(&w_id) - .fetch_optional(&db) - .await?; - - let app = not_found_if_none(app_o, "App", id.to_string())?; - - let policy = serde_json::from_str::(app.policy.0.get()).map_err(to_anyhow)?; - - if matches!(policy.execution_mode, ExecutionMode::Anonymous) { - return Ok(Json(app)); - } - - if opt_authed.is_none() { - { - return Err(Error::NotAuthorized( - "App visibility does not allow public access and you are not logged in".to_string(), - )); - } - } else { - let authed = opt_authed.unwrap(); - let mut tx = user_db.begin(&authed).await?; - let is_visible = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM app WHERE id = $1 AND workspace_id = $2)", - id, - &w_id - ) - .fetch_one(&mut *tx) - .await?; - tx.commit().await?; - if !is_visible.unwrap_or(false) { - return Err(Error::NotAuthorized( - "App visibility does not allow public access and you are logged in but you have no read-access to that app".to_string(), - )); - } - } - - Ok(Json(app)) -} - -async fn get_public_resource( - Extension(db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult> { - let path = path.to_path(); - if !path.starts_with("f/app_themes/") { - return Err(Error::BadRequest( - "Only app themes are public resources".to_string(), - )); - } - let res = sqlx::query_scalar!( - "SELECT value from resource WHERE path = $1 AND workspace_id = $2", - path.to_owned(), - &w_id - ) - .fetch_optional(&db) - .await? - .flatten(); - Ok(Json(res)) -} - -async fn get_secret_id( - authed: ApiAuthed, - Extension(user_db): Extension, - Extension(db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> Result { - let path = path.to_path(); - let mut tx = user_db.begin(&authed).await?; - - let id_o = sqlx::query_scalar!( - "SELECT app.id FROM app - WHERE app.path = $1 AND app.workspace_id = $2", - path, - &w_id - ) - .fetch_optional(&mut *tx) - .await?; - tx.commit().await?; - - let id = not_found_if_none(id_o, "App", path.to_string())?; - - let mc = build_crypt(&db, &w_id).await?; - - let hx = hex::encode(mc.encrypt_str_to_bytes(id.to_string())); - - Ok(hx) -} - -async fn create_app( - authed: ApiAuthed, - Extension(user_db): Extension, - Extension(db): Extension, - Extension(webhook): Extension, - Path(w_id): Path, - Json(mut app): Json, -) -> Result<(StatusCode, String)> { - let mut tx = user_db.clone().begin(&authed).await?; - - app.policy.on_behalf_of = Some(username_to_permissioned_as(&authed.username)); - app.policy.on_behalf_of_email = Some(authed.email.clone()); - - if &app.path == "" { - return Err(Error::BadRequest("App path cannot be empty".to_string())); - } - - let exists = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM app WHERE path = $1 AND workspace_id = $2)", - &app.path, - w_id - ) - .fetch_one(&mut *tx) - .await? - .unwrap_or(false); - - if exists { - return Err(Error::BadRequest(format!( - "App with path {} already exists", - &app.path - ))); - } - - if let Some(custom_path) = &app.custom_path { - require_admin(authed.is_admin, &authed.username)?; - - let exists = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM app WHERE custom_path = $1 AND ($2::TEXT IS NULL OR workspace_id = $2))", - custom_path, - if *CLOUD_HOSTED { Some(&w_id) } else { None } - ) - .fetch_one(&mut *tx) - .await?.unwrap_or(false); - - if exists { - return Err(Error::BadRequest(format!( - "App with custom path {} already exists", - custom_path - ))); - } - } - - sqlx::query!( - "DELETE FROM draft WHERE path = $1 AND workspace_id = $2 AND typ = 'app'", - &app.path, - &w_id - ) - .execute(&mut *tx) - .await?; - - let id = sqlx::query_scalar!( - "INSERT INTO app - (workspace_id, path, summary, policy, versions, draft_only, custom_path) - VALUES ($1, $2, $3, $4, '{}', $5, $6) RETURNING id", - w_id, - app.path, - app.summary, - json!(app.policy), - app.draft_only, - app.custom_path - .map(|s| if s.is_empty() { None } else { Some(s) }) - .flatten() - ) - .fetch_one(&mut *tx) - .await?; - - let v_id = sqlx::query_scalar!( - "INSERT INTO app_version - (app_id, value, created_by) - VALUES ($1, $2::text::json, $3) RETURNING id", - id, - //to preserve key orders - serde_json::to_string(&app.value).unwrap(), - authed.username, - ) - .fetch_one(&mut *tx) - .await?; - - sqlx::query!( - "UPDATE app SET versions = array_append(versions, $1::bigint) WHERE id = $2", - v_id, - id - ) - .execute(&mut *tx) - .await?; - - audit_log( - &mut *tx, - &authed, - "apps.create", - ActionKind::Create, - &w_id, - Some(&app.path), - None, - ) - .await?; - - let mut args: HashMap> = HashMap::new(); - if let Some(dm) = app.deployment_message { - args.insert("deployment_message".to_string(), to_raw_value(&dm)); - } - - let tx = PushIsolationLevel::Transaction(tx); - let (dependency_job_uuid, new_tx) = push( - &db, - tx, - &w_id, - JobPayload::AppDependencies { path: app.path.clone(), version: v_id }, - PushArgs { args: &args, extra: None }, - &authed.username, - &authed.email, - windmill_common::users::username_to_permissioned_as(&authed.username), - None, - None, - None, - None, - None, - false, - false, - None, - true, - None, - None, - None, - None, - Some(&authed.clone().into()), - ) - .await?; - tracing::info!("Pushed app dependency job {}", dependency_job_uuid); - - new_tx.commit().await?; - - webhook.send_message( - w_id.clone(), - WebhookMessage::CreateApp { workspace: w_id, path: app.path.clone() }, - ); - - Ok((StatusCode::CREATED, app.path)) -} - -async fn list_hub_apps(Extension(db): Extension) -> impl IntoResponse { - let (status_code, headers, body) = query_elems_from_hub( - &HTTP_CLIENT, - &format!("{}/searchUiData?approved=true", *HUB_BASE_URL.read().await), - None, - &db, - ) - .await?; - Ok::<_, Error>((status_code, headers, body)) -} - -pub async fn get_hub_app_by_id( - Path(id): Path, - Extension(db): Extension, -) -> JsonResult> { - let value = http_get_from_hub( - &HTTP_CLIENT, - &format!("{}/apps/{}/json", *HUB_BASE_URL.read().await, id), - false, - None, - Some(&db), - ) - .await? - .json() - .await - .map_err(to_anyhow)?; - Ok(Json(value)) -} - -async fn delete_app( - authed: ApiAuthed, - Extension(db): Extension, - Extension(user_db): Extension, - Extension(webhook): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> Result { - let path = path.to_path(); - - if path == "g/all/setup_app" && w_id == "admins" { - return Err(Error::BadRequest( - "Cannot delete the global setup app".to_string(), - )); - } - - let mut tx = user_db.begin(&authed).await?; - - sqlx::query!( - "DELETE FROM draft WHERE path = $1 AND workspace_id = $2 AND typ = 'app'", - path, - &w_id - ) - .execute(&mut *tx) - .await?; - - sqlx::query!( - "DELETE FROM app WHERE path = $1 AND workspace_id = $2", - path, - &w_id - ) - .execute(&mut *tx) - .await?; - - audit_log( - &mut *tx, - &authed, - "apps.delete", - ActionKind::Delete, - &w_id, - Some(path), - None, - ) - .await?; - tx.commit().await?; - - handle_deployment_metadata( - &authed.email, - &authed.username, - &db, - &w_id, - DeployedObject::App { - path: path.to_string(), - parent_path: Some(path.to_string()), - version: 0, // dummy version as it will not get inserted in db - }, - Some(format!("App '{}' deleted", path)), - true, - ) - .await?; - - sqlx::query!( - "DELETE FROM deployment_metadata WHERE path = $1 AND workspace_id = $2 AND app_version IS NOT NULL", - path, - w_id - ) - .execute(&db) - .await - .map_err(|e| { - Error::InternalErr(format!( - "error deleting deployment metadata for script with path {path} in workspace {w_id}: {e:#}" - )) - })?; - - webhook.send_message( - w_id.clone().clone(), - WebhookMessage::DeleteApp { workspace: w_id, path: path.to_owned() }, - ); - - Ok(format!("app {} deleted", path)) -} - -async fn update_app( - authed: ApiAuthed, - Extension(db): Extension, - Extension(user_db): Extension, - Extension(webhook): Extension, - Path((w_id, path)): Path<(String, StripPath)>, - Json(ns): Json, -) -> Result { - use sql_builder::prelude::*; - - let path = path.to_path(); - - let mut tx = user_db.clone().begin(&authed).await?; - - let npath = if ns.policy.is_some() - || ns.path.is_some() - || ns.summary.is_some() - || ns.custom_path.is_some() - { - let mut sqlb = SqlBuilder::update_table("app"); - sqlb.and_where_eq("path", "?".bind(&path)); - sqlb.and_where_eq("workspace_id", "?".bind(&w_id)); - - sqlb.set("draft_only", "NULL"); - if let Some(npath) = &ns.path { - if npath != path { - require_owner_of_path(&authed, path)?; - - let exists = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM app WHERE path = $1 AND workspace_id = $2)", - npath, - w_id - ) - .fetch_one(&mut *tx) - .await? - .unwrap_or(false); - - if exists { - return Err(Error::BadRequest(format!( - "App with path {} already exists", - npath - ))); - } - } - sqlb.set_str("path", npath); - } - - if let Some(nsummary) = &ns.summary { - sqlb.set_str("summary", nsummary); - } - - if let Some(ncustom_path) = &ns.custom_path { - require_admin(authed.is_admin, &authed.username)?; - - if ncustom_path.is_empty() { - sqlb.set("custom_path", "NULL"); - } else { - let exists = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM app WHERE custom_path = $1 AND ($2::TEXT IS NULL OR workspace_id = $2) AND NOT (path = $3 AND workspace_id = $4))", - ncustom_path, - if *CLOUD_HOSTED { Some(&w_id) } else { None }, - path, - w_id - ) - .fetch_one(&mut *tx) - .await?.unwrap_or(false); - - if exists { - return Err(Error::BadRequest(format!( - "App with custom path {} already exists", - ncustom_path - ))); - } - sqlb.set_str("custom_path", ncustom_path); - } - } - - if let Some(mut npolicy) = ns.policy { - npolicy.on_behalf_of = Some(username_to_permissioned_as(&authed.username)); - npolicy.on_behalf_of_email = Some(authed.email.clone()); - sqlb.set( - "policy", - quote(serde_json::to_string(&json!(npolicy)).map_err(|e| { - Error::BadRequest(format!("failed to serialize policy: {}", e)) - })?), - ); - } - - sqlb.returning("path"); - - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; - let npath_o: Option = sqlx::query_scalar(&sql).fetch_optional(&mut *tx).await?; - not_found_if_none(npath_o, "App", path)? - } else { - path.to_owned() - }; - let v_id = if let Some(nvalue) = &ns.value { - let app_id = sqlx::query_scalar!( - "SELECT id FROM app WHERE path = $1 AND workspace_id = $2", - npath, - w_id - ) - .fetch_one(&mut *tx) - .await?; - - let v_id = sqlx::query_scalar!( - "INSERT INTO app_version - (app_id, value, created_by) - VALUES ($1, $2::text::json, $3) RETURNING id", - app_id, - //to preserve key orders - serde_json::to_string(&nvalue).unwrap(), - authed.username, - ) - .fetch_one(&mut *tx) - .await?; - - sqlx::query!( - "UPDATE app SET versions = array_append(versions, $1::bigint) WHERE path = $2 AND workspace_id = $3", - v_id, - npath, - w_id - ) - .execute(&mut *tx) - .await?; - v_id - } else { - let v_id = sqlx::query_scalar!( - "SELECT app.versions[array_upper(app.versions, 1)] FROM app WHERE path = $1 AND workspace_id = $2", - npath, - w_id - ) - .fetch_one(&mut *tx) - .await?; - if let Some(v_id) = v_id { - v_id - } else { - return Err(Error::BadRequest(format!( - "App with path {} not found", - npath - ))); - } - }; - - sqlx::query!( - "DELETE FROM draft WHERE path = $1 AND workspace_id = $2 AND typ = 'app'", - path, - &w_id - ) - .execute(&mut *tx) - .await?; - - audit_log( - &mut *tx, - &authed, - "apps.update", - ActionKind::Update, - &w_id, - Some(&npath), - None, - ) - .await?; - - let tx = PushIsolationLevel::Transaction(tx); - let mut args: HashMap> = HashMap::new(); - if let Some(dm) = ns.deployment_message { - args.insert("deployment_message".to_string(), to_raw_value(&dm)); - } - args.insert("parent_path".to_string(), to_raw_value(&path)); - - let (dependency_job_uuid, new_tx) = push( - &db, - tx, - &w_id, - JobPayload::AppDependencies { path: npath.clone(), version: v_id }, - PushArgs { args: &args, extra: None }, - &authed.username, - &authed.email, - windmill_common::users::username_to_permissioned_as(&authed.username), - None, - None, - None, - None, - None, - false, - false, - None, - true, - None, - None, - None, - None, - Some(&authed.clone().into()), - ) - .await?; - tracing::info!("Pushed app dependency job {}", dependency_job_uuid); - new_tx.commit().await?; - - webhook.send_message( - w_id.clone(), - WebhookMessage::UpdateApp { - workspace: w_id, - old_path: path.to_owned(), - new_path: npath.clone(), - }, - ); - - Ok(format!("app {} updated (npath: {:?})", path, npath)) -} - -#[derive(Debug, Deserialize, Clone)] -pub struct ExecuteApp { - /// The app version to execute. Fallback to `path` if not provided. - pub version: Option, - /// The app script id (from the `app_script` table) to execute. - pub id: Option, - pub args: HashMap>, - // - script: script/ - // - flow: flow/ - pub path: Option, - pub component: String, - pub raw_code: Option, - // if set, the app is executed as viewer with the given static fields - pub force_viewer_static_fields: Option, - pub force_viewer_one_of_fields: Option, - pub force_viewer_allow_user_resources: Option, -} - -fn digest(code: &str) -> String { - let mut hasher = Sha256::new(); - hasher.update(code); - let result = hasher.finalize(); - format!("rawscript/{:x}", result) -} - -async fn get_on_behalf_details_from_policy_and_authed( - policy: &Policy, - opt_authed: &Option, -) -> Result<(String, String, String)> { - let (username, permissioned_as, email) = match policy.execution_mode { - ExecutionMode::Anonymous => { - let username = opt_authed - .as_ref() - .map(|a| a.username.clone()) - .unwrap_or_else(|| "anonymous".to_string()); - let (permissioned_as, email) = get_on_behalf_of(&policy)?; - (username, permissioned_as, email) - } - ExecutionMode::Publisher => { - let username = opt_authed - .as_ref() - .map(|a| a.username.clone()) - .ok_or_else(|| { - Error::BadRequest( - "publisher execution mode requires authentication".to_string(), - ) - })?; - let (permissioned_as, email) = get_on_behalf_of(&policy)?; - (username, permissioned_as, email) - } - ExecutionMode::Viewer => { - let (username, email) = opt_authed - .as_ref() - .map(|a| (a.username.clone(), a.email.clone())) - .ok_or_else(|| { - Error::BadRequest("Required to be authed in viewer mode".to_string()) - })?; - ( - username.clone(), - username_to_permissioned_as(&username), - email, - ) - } - }; - - Ok((username, permissioned_as, email)) -} - -/// Convert the triggerables from the old format to the new format. -fn empty_triggerables(mut policy: Policy) -> Policy { - use std::mem::take; - if let Some(triggerables) = take(&mut policy.triggerables) { - let mut triggerables_v2 = take(&mut policy.triggerables_v2).unwrap_or_default(); - for (k, static_inputs) in triggerables.into_iter() { - triggerables_v2.insert( - k, - PolicyTriggerableInputs { static_inputs, ..Default::default() }, - ); - } - policy.triggerables_v2 = Some(triggerables_v2); - } - policy -} - -async fn execute_component( - OptAuthed(opt_authed): OptAuthed, - Extension(db): Extension, - Extension(user_db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, - Json(payload): Json, -) -> Result { - match (payload.path.is_some(), payload.raw_code.is_some()) { - (false, false) => { - return Err(Error::BadRequest( - "path or raw_code is required".to_string(), - )) - } - (true, true) => { - return Err(Error::BadRequest( - "path and raw_code cannot be set at the same time".to_string(), - )) - } - _ => {} - }; - - let path = path.to_path(); - let (arc_policy, policy): (Arc, Policy); - let policy_triggerables_default = Default::default(); - - // Two cases here: - // 1. The component is executed from the editor (i.e. in "preview" mode), then: - // - The policy is set to default (in `Viewer` execution mode). - // - The policy triggerables are built by the frontend and retrieved from the request - // payload. - // - In case of inline script, the `RawCode` from the request is pushed as is to the - // job queue. - // 2. Otherwise (i.e. "run" mode): - // - The policy and triggerables are fetched from the database. - // - In case of inline script, if an entry exists in the `app_script` table, push - // an `AppScript` job payload, as in (.1) otherwise. - let (policy, policy_triggerables) = match payload { - // 1. "preview" mode. - ExecuteApp { - force_viewer_static_fields: Some(static_inputs), - force_viewer_one_of_fields: Some(one_of_inputs), - force_viewer_allow_user_resources: Some(allow_user_resources), - .. - } => ( - &Policy { execution_mode: ExecutionMode::Viewer, ..Default::default() }, - &PolicyTriggerableInputs { static_inputs, one_of_inputs, allow_user_resources }, - ), - // 2. "run" mode. - _ => { - // Policy is fetched from the database on app `path` and `workspace_id`. - let policy_fut = sqlx::query_scalar!( - "SELECT policy as \"policy: sqlx::types::Json>\" - FROM app WHERE app.path = $1 AND app.workspace_id = $2 LIMIT 1", - path, - &w_id, - ) - .fetch_optional(&db) - .map_err(Into::::into) - .map(|policy_o| Result::Ok(not_found_if_none(policy_o?, "App", path)?)) - .map(|policy| Result::Ok(serde_json::from_str(policy?.get())?)) - .map_ok(empty_triggerables); - - // 1. The app `version` is provided: cache the fetched policy. - // 2. Otherwise, always fetch the policy from the database. - let policy = if let Some(id) = payload.version { - let cache = cache::anon!({ u64 => Arc } in "policy" <= 1000); - arc_policy = policy_fut.map_ok(Arc::new).cached(cache, id as u64).await?; - &*arc_policy - } else { - policy = policy_fut.await?; - &policy - }; - - // Compute the path for the triggerables map: - // - flow: `flow/` - // - script: `script/` - // - inline script: `rawscript/` - let path = match &payload { - // flow or script: just use the `payload.path`. - ExecuteApp { path: Some(path), .. } => path, - // inline script: without entry in the `app_script` table. - ExecuteApp { raw_code: Some(raw_code), id: None, .. } => &digest(&raw_code.content), - // inline script: with an entry in the `app_script` table. - ExecuteApp { raw_code: Some(_), id: Some(id), .. } => { - let cache = cache::anon!({ u64 => Arc } in "appscriptpath" <= 10000); - // `id` is unique, cache the result. - &*sqlx::query_scalar!( - "SELECT format('rawscript/%s', code_sha256) as \"path!: String\" - FROM app_script WHERE id = $1 LIMIT 1", - id - ) - .fetch_one(&db) - .map_err(Into::::into) - .map_ok(Arc::new) - .cached(cache, *id as u64) - .await? - } - _ => unreachable!(), - }; - - // Retrieve the triggerables from the policy on `path` or `:`. - let triggerables_v2 = policy - .triggerables_v2 - .as_ref() - .ok_or_else(|| Error::BadRequest(format!("Policy is missing triggerables")))?; - let policy_triggerables = triggerables_v2 - .get(path) // start with `path` in case we can avoid the next` format!`. - .or_else(|| triggerables_v2.get(&format!("{}:{}", payload.component, &path))) - .or(match policy.execution_mode { - ExecutionMode::Viewer => Some(&policy_triggerables_default), - _ => None, - }) - .ok_or_else(|| Error::BadRequest(format!("Path {path} forbidden by policy")))?; - - (policy, policy_triggerables) - } - }; - - let (username, permissioned_as, email) = - get_on_behalf_details_from_policy_and_authed(&policy, &opt_authed).await?; - - let (args, job_id) = build_args( - policy, - policy_triggerables, - payload.args, - opt_authed.as_ref(), - &user_db, - &db, - &w_id, - ) - .await?; - - let (job_payload, tag) = match (payload.path, payload.raw_code, payload.id) { - // flow or script: - (Some(path), None, None) => get_payload_tag_from_prefixed_path(&path, &db, &w_id).await?, - // inline script: in "preview" mode or without entry in the `app_script` table. - (None, Some(raw_code), None) => (JobPayload::Code(raw_code), None), - // inline script: in "run" mode and with an entry in the `app_script` table. - (None, Some(RawCode { language, path, cache_ttl, .. }), Some(id)) => ( - JobPayload::AppScript { id: AppScriptId(id), cache_ttl, language, path }, - None, - ), - _ => unreachable!(), - }; - let tx = windmill_queue::PushIsolationLevel::IsolatedRoot(db.clone()); - - let (uuid, tx) = push( - &db, - tx, - &w_id, - job_payload, - PushArgs { args: &args.args, extra: args.extra }, - &username, - &email, - permissioned_as, - None, - None, - None, - None, - job_id, - false, - false, - None, - true, - tag, - None, - None, - None, - None, - ) - .await?; - tx.commit().await?; - - Ok(uuid.to_string()) -} - -#[cfg(not(feature = "parquet"))] -async fn upload_s3_file_from_app() -> Result<()> { - return Err(Error::BadRequest( - "This endpoint requires the parquet feature to be enabled".to_string(), - )); -} - -#[cfg(feature = "parquet")] -#[derive(Debug, Deserialize, Clone)] -struct UploadFileToS3Query { - file_key: Option, - file_extension: Option, - s3_resource_path: Option, - content_type: Option, - content_disposition: Option, - force_viewer_file_key_regex: Option, - force_viewer_allow_user_resources: Option, - force_viewer_allow_workspace_resource: Option, - force_viewer_allowed_resources: Option, -} - -#[cfg(feature = "parquet")] -async fn upload_s3_file_from_app( - OptAuthed(opt_authed): OptAuthed, - Extension(db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, - Query(query): Query, - request: axum::extract::Request, -) -> JsonResult { - let policy = if let Some(file_key_regex) = query.force_viewer_file_key_regex { - Some(Policy { - execution_mode: ExecutionMode::Viewer, - triggerables: None, - triggerables_v2: None, - on_behalf_of: None, - on_behalf_of_email: None, - s3_inputs: Some(vec![S3Input { - file_key_regex: file_key_regex, - allow_user_resources: query.force_viewer_allow_user_resources.unwrap_or(false), - allow_workspace_resource: query - .force_viewer_allow_workspace_resource - .unwrap_or(false), - allowed_resources: query - .force_viewer_allowed_resources - .map(|s| s.split(',').map(|s| s.to_string()).collect()) - .unwrap_or_default(), - }]), - }) - } else { - let policy_o = sqlx::query_scalar!( - "SELECT policy from app WHERE path = $1 AND workspace_id = $2", - &path.0, - &w_id - ) - .fetch_optional(&db) - .await?; - - policy_o - .map(|p| serde_json::from_value::(p).map_err(to_anyhow)) - .transpose()? - }; - - let user_db = UserDB::new(db.clone()); - - let (s3_resource_opt, file_key) = if policy.as_ref().is_some_and(|p| p.s3_inputs.is_some()) { - let policy = policy.unwrap(); - let s3_inputs = policy.s3_inputs.as_ref().unwrap(); - - let (username, permissioned_as, email) = - get_on_behalf_details_from_policy_and_authed(&policy, &opt_authed).await?; - - let on_behalf_authed = fetch_api_authed_from_permissioned_as( - permissioned_as, - email, - &w_id, - &db, - Some(username), - ) - .await?; - - if let Some(file_key) = query.file_key { - // file key is provided => requires workspace, user or list policy and must match the regex - let matching_s3_inputs = if let Some(ref s3_resource_path) = query.s3_resource_path { - s3_inputs - .iter() - .filter(|s3_input| { - s3_input.allowed_resources.contains(s3_resource_path) - || s3_input.allow_user_resources - }) - .sorted_by_key(|i| i.allow_user_resources) // consider user resources last - .collect::>() - } else { - s3_inputs - .iter() - .filter(|s3_input| s3_input.allow_workspace_resource) - .collect::>() - }; - - let matched_input = matching_s3_inputs.iter().find(|s3_input| { - match Regex::new(&s3_input.file_key_regex) { - Ok(re) => re.is_match(&file_key), - Err(e) => { - tracing::error!("Error compiling regex: {}", e); - false - } - } - }); - - if let Some(matched_input) = matched_input { - if let Some(ref s3_resource_path) = query.s3_resource_path { - if matched_input.allow_user_resources { - if let Some(authed) = opt_authed { - ( - Some( - get_s3_resource( - &authed, - &db, - Some(user_db), - "", - &w_id, - s3_resource_path, - None, - None, - ) - .await?, - ), - file_key, - ) - } else { - return Err(Error::BadRequest( - "User resources are not allowed without being logged in" - .to_string(), - )); - } - } else { - ( - Some( - get_s3_resource( - &on_behalf_authed, - &db, - Some(user_db), - "", - &w_id, - s3_resource_path, - None, - None, - ) - .await?, - ), - file_key, - ) - } - } else { - let (_, s3_resource_opt) = - get_workspace_s3_resource(&on_behalf_authed, &db, None, "", &w_id, None) - .await?; - (s3_resource_opt, file_key) - } - } else { - return Err(Error::BadRequest( - "No matching s3 resource found for the given file key".to_string(), - )); - } - } else { - // no file key => requires unnamed upload policy => allow workspace resource and file_key_regex is empty - let has_unnamed_policy = s3_inputs.iter().any(|s3_input| { - s3_input.allow_workspace_resource && s3_input.file_key_regex.is_empty() - }); - - if !has_unnamed_policy { - return Err(Error::BadRequest( - "no policy found for unnamed s3 file uplooad".to_string(), - )); - } - - // for now, we place all files into `windmill_uploads` folder with a random name - // TODO: make the folder configurable via the workspace settings - let file_key = get_random_file_name(query.file_extension); - - let (_, s3_resource_opt) = - get_workspace_s3_resource(&on_behalf_authed, &db, None, "", &w_id, None).await?; - - (s3_resource_opt, file_key) - } - } else { - // backward compatibility (no policy) - // if no policy but logged in, use the user's auth to get the s3 resource - if let Some(authed) = opt_authed { - let file_key = query - .file_key - .unwrap_or_else(|| get_random_file_name(query.file_extension)); - - if let Some(ref s3_resource_path) = query.s3_resource_path { - ( - Some( - get_s3_resource( - &authed, - &db, - Some(user_db), - "", - &w_id, - s3_resource_path, - None, - None, - ) - .await?, - ), - file_key, - ) - } else { - let (_, s3_resource) = - get_workspace_s3_resource(&authed, &db, None, "", &w_id, None).await?; - - (s3_resource, file_key) - } - } else { - return Err(Error::BadRequest("Missing s3 policy".to_string())); - } - }; - - let s3_resource = s3_resource_opt.ok_or(Error::InternalErr( - "No files storage resource defined at the workspace level".to_string(), - ))?; - let s3_client = build_object_store_client(&s3_resource).await?; - - let options = Attributes::from_iter(vec![ - ( - Attribute::ContentType, - query.content_type.unwrap_or_else(|| { - mime_guess::from_path(&file_key) - .first_or_octet_stream() - .to_string() - }), - ), - ( - Attribute::ContentDisposition, - query.content_disposition.unwrap_or("inline".to_string()), - ), - ]) - .into(); - - upload_file_from_req(s3_client, &file_key, request, options).await?; - - return Ok(Json(UploadFileResponse { file_key })); -} - -fn get_on_behalf_of(policy: &Policy) -> Result<(String, String)> { - let permissioned_as = policy - .on_behalf_of - .as_ref() - .ok_or_else(|| { - Error::BadRequest( - "on_behalf_of is missing in the app policy and is required for anonymous execution" - .to_string(), - ) - })? - .to_string(); - let email = policy - .on_behalf_of_email - .as_ref() - .ok_or_else(|| { - Error::BadRequest( - "on_behalf_of_email is missing in the app policy and is required for anonymous execution" - .to_string(), - ) - })? - .to_string(); - Ok((permissioned_as, email)) -} - -pub async fn require_is_writer(authed: &ApiAuthed, path: &str, w_id: &str, db: DB) -> Result<()> { - return crate::users::require_is_writer( - authed, - path, - w_id, - db, - "SELECT extra_perms FROM app WHERE path = $1 AND workspace_id = $2", - "app", - ) - .await; -} - -async fn exists_app( - Extension(db): Extension, - Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult { - let path = path.to_path(); - let exists = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM app WHERE path = $1 AND workspace_id = $2)", - path, - w_id - ) - .fetch_one(&db) - .await? - .unwrap_or(false); - - Ok(Json(exists)) -} - -async fn build_args( - policy: &Policy, - PolicyTriggerableInputs { - static_inputs, - one_of_inputs, - allow_user_resources, - }: &PolicyTriggerableInputs, - mut args: HashMap>, - authed: Option<&ApiAuthed>, - user_db: &UserDB, - db: &DB, - w_id: &str, -) -> Result<(PushArgsOwned, Option)> { - let mut job_id: Option = None; - let mut safe_args = HashMap::>::new(); - - // tracing::error!("{:?}", allow_user_resources); - for k in allow_user_resources.iter() { - if let Some(arg_val) = args.get(k) { - let key = serde_json::from_str::(arg_val.get()).ok(); - if let Some(path) = - key.and_then(|x| x.clone().strip_prefix("$res:").map(|x| x.to_string())) - { - if let Some(authed) = authed { - let res = get_resource_value_interpolated_internal( - authed, - Some(user_db.clone()), - db, - w_id, - &path, - None, - "", - ) - .await?; - if res.is_none() { - return Err(Error::BadRequest(format!( - "Resource {} not found or not allowed for viewer", - path - ))); - } - let job_id = if let Some(job_id) = job_id { - job_id - } else { - job_id = Some(ulid::Ulid::new().into()); - job_id.unwrap() - }; - let mc = build_crypt_with_key_suffix(&db, &w_id, &job_id.to_string()).await?; - let encrypted = encrypt(&mc, to_raw_value(&res.unwrap()).get()); - safe_args.insert( - k.to_string(), - to_raw_value(&format!("$encrypted:{encrypted}")), - ); - } else { - return Err(Error::BadRequest( - "User resources are not allowed without being logged in".to_string(), - )); - } - } - } - } - - for (k, v) in one_of_inputs { - if safe_args.contains_key(k) { - continue; - } - if let Some(arg_val) = args.get(k) { - let arg_str = arg_val.get(); - - let options_str_vec = v.iter().map(|x| x.get()).collect::>(); - if options_str_vec.contains(&arg_str) { - safe_args.insert(k.to_string(), arg_val.clone()); - args.remove(k); - continue; - } - - // check if multiselect - if let Ok(args_str_vec) = serde_json::from_str::>>(arg_val.get()) { - if args_str_vec - .iter() - .all(|x| options_str_vec.contains(&x.get())) - { - safe_args.insert(k.to_string(), arg_val.clone()); - args.remove(k); - continue; - } - } - - return Err(Error::BadRequest(format!( - "argument {} with value {} must be one of [{}]", - k, - arg_str, - options_str_vec.join(",") - ))); - } - } - - for (k, v) in args { - if safe_args.contains_key(&k) { - continue; - } - - let arg_str = v.get(); - - if arg_str.starts_with("\"$ctx:") { - let prop = arg_str.trim_start_matches("\"$ctx:").trim_end_matches("\""); - let value = match prop { - "username" => authed.as_ref().map(|a| { - serde_json::to_value(a.username_override.as_ref().unwrap_or(&a.username)) - }), - "email" => authed.as_ref().map(|a| serde_json::to_value(&a.email)), - "workspace" => Some(serde_json::to_value(&w_id)), - "groups" => authed.as_ref().map(|a| serde_json::to_value(&a.groups)), - "author" => Some(serde_json::to_value(&policy.on_behalf_of_email)), - _ => { - return Err(Error::BadRequest(format!( - "context variable {} not allowed", - prop - ))) - } - }; - safe_args.insert( - k.to_string(), - to_raw_value(&value.unwrap_or(Ok(serde_json::Value::Null)).map_err(|e| { - Error::InternalErr(format!("failed to serialize ctx variable for {}: {}", k, e)) - })?), - ); - } else if !arg_str.contains("\"$var:") && !arg_str.contains("\"$res:") { - safe_args.insert(k.to_string(), v); - } else { - safe_args.insert( - k.to_string(), - RawValue::from_string( - arg_str - .replace( - "$var:", - "The following variable has been omitted for security reasons: ", - ) - .replace( - "$res:", - "The following resource has been omitted for security reasons, to allow it, toggle: 'Allow resources from users' on that field input: ", - ), - ) - .map_err(|e| { - Error::InternalErr(format!( - "failed to remove sensitive variable(s)/resource(s) with error: {}", - e - )) - })?, - ); - } - } - let mut extra = HashMap::new(); - for (k, v) in static_inputs { - extra.insert(k.to_string(), v.to_owned()); - } - Ok(( - PushArgsOwned { extra: Some(extra), args: safe_args }, - job_id, - )) -} diff --git a/backend/windmill-common/src/apps.rs b/backend/windmill-common/src/apps.rs index c5b22d434c296..12a1011249c8b 100644 --- a/backend/windmill-common/src/apps.rs +++ b/backend/windmill-common/src/apps.rs @@ -24,7 +24,11 @@ pub struct ListAppQuery { pub with_deployment_msg: Option, } +#[derive(Deserialize)] +pub struct AppFile { + pub code: String, +} #[derive(Deserialize)] pub struct RawAppValue { - pub files: HashMap, + pub files: HashMap, } diff --git a/backend/windmill-worker/src/worker_lockfiles.rs b/backend/windmill-worker/src/worker_lockfiles.rs index d572464c8af9b..bf76c4e2be06a 100644 --- a/backend/windmill-worker/src/worker_lockfiles.rs +++ b/backend/windmill-worker/src/worker_lockfiles.rs @@ -49,6 +49,7 @@ use crate::{ bun_executor::gen_bun_lockfile, deno_executor::generate_deno_lock, go_executor::install_go_dependencies, }; +use crate::{get_common_bun_proc_envs, install_bun_lockfile}; pub async fn update_script_dependency_map( job_id: &Uuid, @@ -1544,7 +1545,8 @@ pub async fn handle_app_dependency_job( .execute(db) .await?; - if !raw_app { + if raw_app { + // tracing::error!("Raw app detected: {value:?}"); let app_value = serde_json::from_value::(value) .map_err(|e| Error::InternalErr(format!("Failed to parse raw app: {}", e)))?; upload_raw_app( @@ -1556,6 +1558,7 @@ pub async fn handle_app_dependency_job( db, worker_name, &mut Some(occupancy_metrics), + id, ) .await?; } @@ -1612,32 +1615,68 @@ async fn upload_raw_app( db: &sqlx::Pool, worker_name: &str, occupancy_metrics: &mut Option<&mut OccupancyMetrics>, + version: i64, ) -> Result<()> { for file in app_value.files.iter() { - write_file(&job_dir, file.0, file.1)?; - let mut cmd = tokio::process::Command::new("esbuild"); - cmd.env_clear() - .args("--bundle --minify --sourcemap --outfile=dist/".split(' ')) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - let child = start_child_process(cmd, "esbuild").await?; - - crate::handle_child::handle_child( - &job.id, - db, - mem_peak, - canceled_by, - child, - false, - worker_name, - &job.workspace_id, - "esbuild", - Some(30), - false, - occupancy_metrics, - ) - .await?; + write_file(&job_dir, file.0, &file.1.code)?; } + let common_bun_proc_envs: HashMap = get_common_bun_proc_envs(None).await; + + install_bun_lockfile( + mem_peak, + canceled_by, + &job.id, + &job.workspace_id, + Some(db), + job_dir, + worker_name, + common_bun_proc_envs, + false, + occupancy_metrics, + ) + .await?; + let mut cmd = tokio::process::Command::new("esbuild"); + cmd.current_dir(job_dir) + .env_clear() + .args("--bundle --minify --outdir=dist/ index.tsx".split(' ')) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + let child = start_child_process(cmd, "esbuild").await?; + + crate::handle_child::handle_child( + &job.id, + db, + mem_peak, + canceled_by, + child, + false, + worker_name, + &job.workspace_id, + "esbuild", + Some(30), + false, + occupancy_metrics, + ) + .await?; + let output_dir = format!("{}/dist", job_dir); + let target_dir = format!("/home/rfiszel/wmill/{}/{}", job.workspace_id, version); + + tokio::fs::create_dir_all(&target_dir).await?; + + tracing::info!("Copying files from {} to {}", output_dir, target_dir); + + let index_ts = format!("{}/index.js", output_dir); + let index_css = format!("{}/index.css", output_dir); + + if tokio::fs::metadata(&index_ts).await.is_ok() { + tokio::fs::copy(&index_ts, format!("{}/index.js", target_dir)).await?; + } + + if tokio::fs::metadata(&index_css).await.is_ok() { + tokio::fs::copy(&index_css, format!("{}/index.css", target_dir)).await?; + } + // let file_path = format!("/home/rfiszel/wmill/{}/{}", job.workspace_id, version); + Ok(()) }