From 1d20dea6630407840fd058a8b51c410fae3fbe78 Mon Sep 17 00:00:00 2001 From: HugoCasa Date: Sun, 22 Dec 2024 05:52:23 +0100 Subject: [PATCH] fix(backend): timeout for bigquery/graphql/snowflake (#4965) --- .../windmill-worker/src/bigquery_executor.rs | 23 ++++++++++--------- backend/windmill-worker/src/common.rs | 16 +++++++++---- .../windmill-worker/src/graphql_executor.rs | 10 +++++--- .../windmill-worker/src/snowflake_executor.rs | 14 ++++++++--- 4 files changed, 41 insertions(+), 22 deletions(-) diff --git a/backend/windmill-worker/src/bigquery_executor.rs b/backend/windmill-worker/src/bigquery_executor.rs index 30c0f1d575617..e4674ed6ee387 100644 --- a/backend/windmill-worker/src/bigquery_executor.rs +++ b/backend/windmill-worker/src/bigquery_executor.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; +use reqwest::Client; use serde_json::{json, value::RawValue, Value}; use windmill_common::error::to_anyhow; use windmill_common::jobs::QueuedJob; @@ -9,11 +10,11 @@ use windmill_common::{error::Error, worker::to_raw_value}; use windmill_parser_sql::{ parse_bigquery_sig, parse_db_resource, parse_sql_blocks, parse_sql_statement_named_params, }; -use windmill_queue::{CanceledBy, HTTP_CLIENT}; +use windmill_queue::CanceledBy; use serde::Deserialize; -use crate::common::OccupancyMetrics; +use crate::common::{build_http_client, OccupancyMetrics}; use crate::handle_child::run_future_with_polling_update_job_poller; use crate::{ common::{build_args_values, resolve_job_timeout}, @@ -68,9 +69,10 @@ fn do_bigquery_inner<'a>( all_statement_values: &'a HashMap, project_id: &'a str, token: &'a str, - timeout_ms: i32, + timeout_ms: u64, column_order: Option<&'a mut Option>>, skip_collect: bool, + http_client: &'a Client, ) -> windmill_common::error::Result>>> { let param_names = parse_sql_statement_named_params(query, '@'); @@ -86,7 +88,7 @@ fn do_bigquery_inner<'a>( .collect::>(); let result_f = async move { - let response = HTTP_CLIENT + let response = http_client .post( "https://bigquery.googleapis.com/bigquery/v2/projects/".to_string() + project_id @@ -249,13 +251,10 @@ pub async fn do_bigquery( .await .map_err(|e| Error::ExecutionErr(e.to_string()))?; - let timeout_ms = i32::try_from( - resolve_job_timeout(&db, &job.workspace_id, job.id, job.timeout) - .await - .0 - .as_millis(), - ) - .unwrap_or(200000); + let (timeout_duration, _, _) = + resolve_job_timeout(&db, &job.workspace_id, job.id, job.timeout).await; + let timeout_ms = timeout_duration.as_millis() as u64; + let http_client = build_http_client(timeout_duration)?; let project_id = authentication_manager .project_id() @@ -325,6 +324,7 @@ pub async fn do_bigquery( timeout_ms, None, annotations.return_last_result && i < queries.len() - 1, + &http_client, ) }) .collect::>>()?; @@ -353,6 +353,7 @@ pub async fn do_bigquery( timeout_ms, Some(column_order), false, + &http_client, )? }; diff --git a/backend/windmill-worker/src/common.rs b/backend/windmill-worker/src/common.rs index 54d7307fd7db8..05444dd975475 100644 --- a/backend/windmill-worker/src/common.rs +++ b/backend/windmill-worker/src/common.rs @@ -3,6 +3,7 @@ use async_recursion::async_recursion; use itertools::Itertools; use lazy_static::lazy_static; use regex::Regex; +use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use serde_json::{json, Value}; @@ -32,11 +33,7 @@ use windmill_common::{ use anyhow::{anyhow, Result}; use std::path::Path; -use std::{ - collections::HashMap, - sync::Arc, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use uuid::Uuid; use windmill_common::{variables, DB}; @@ -965,3 +962,12 @@ pub fn use_flow_root_path(flow_path: &str) -> String { return flow_path.to_string(); } } + +pub fn build_http_client(timeout_duration: std::time::Duration) -> error::Result { + reqwest::ClientBuilder::new() + .user_agent("windmill/beta") + .timeout(timeout_duration) + .connect_timeout(std::time::Duration::from_secs(10)) + .build() + .map_err(|e| Error::InternalErr(format!("Error building http client: {e:#}"))) +} diff --git a/backend/windmill-worker/src/graphql_executor.rs b/backend/windmill-worker/src/graphql_executor.rs index 67bb6a2c7ba64..929235cccf744 100644 --- a/backend/windmill-worker/src/graphql_executor.rs +++ b/backend/windmill-worker/src/graphql_executor.rs @@ -8,11 +8,11 @@ use windmill_common::jobs::QueuedJob; use windmill_common::worker::to_raw_value; use windmill_common::{error::Error, worker::CLOUD_HOSTED}; use windmill_parser_graphql::parse_graphql_sig; -use windmill_queue::{CanceledBy, HTTP_CLIENT}; +use windmill_queue::CanceledBy; use serde::Deserialize; -use crate::common::OccupancyMetrics; +use crate::common::{build_http_client, resolve_job_timeout, OccupancyMetrics}; use crate::handle_child::run_future_with_polling_update_job_poller; use crate::{common::build_args_map, AuthedClientBackgroundTask}; @@ -81,8 +81,12 @@ pub async fn do_graphql( } } } + let (timeout_duration, _, _) = + resolve_job_timeout(&db, &job.workspace_id, job.id, job.timeout).await; - let mut request = HTTP_CLIENT.post(api.base_url).json(&json!({ + let http_client = build_http_client(timeout_duration)?; + + let mut request = http_client.post(api.base_url).json(&json!({ "query": query, "variables": variables })); diff --git a/backend/windmill-worker/src/snowflake_executor.rs b/backend/windmill-worker/src/snowflake_executor.rs index 4e817a3927967..3315df6bc10c0 100644 --- a/backend/windmill-worker/src/snowflake_executor.rs +++ b/backend/windmill-worker/src/snowflake_executor.rs @@ -4,7 +4,7 @@ use core::fmt::Write; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; -use reqwest::Response; +use reqwest::{Client, Response}; use serde_json::{json, value::RawValue, Value}; use sha2::{Digest, Sha256}; use std::collections::HashMap; @@ -17,7 +17,7 @@ use windmill_queue::{CanceledBy, HTTP_CLIENT}; use serde::{Deserialize, Serialize}; -use crate::common::{resolve_job_timeout, OccupancyMetrics}; +use crate::common::{build_http_client, resolve_job_timeout, OccupancyMetrics}; use crate::handle_child::run_future_with_polling_update_job_poller; use crate::{common::build_args_values, AuthedClientBackgroundTask}; @@ -122,6 +122,7 @@ fn do_snowflake_inner<'a>( token_is_keypair: bool, column_order: Option<&'a mut Option>>, skip_collect: bool, + http_client: &'a Client, ) -> windmill_common::error::Result>>> { body.insert("statement".to_string(), json!(query)); @@ -145,7 +146,7 @@ fn do_snowflake_inner<'a>( } let result_f = async move { - let mut request = HTTP_CLIENT + let mut request = http_client .post(format!( "https://{}.snowflakecomputing.com/api/v2/statements/", account_identifier.to_uppercase() @@ -365,6 +366,11 @@ pub async fn do_snowflake( let queries = parse_sql_blocks(query); + let (timeout_duration, _, _) = + resolve_job_timeout(&db, &job.workspace_id, job.id, job.timeout).await; + + let http_client = build_http_client(timeout_duration)?; + let result_f = if queries.len() > 1 { let futures = queries .iter() @@ -379,6 +385,7 @@ pub async fn do_snowflake( token_is_keypair, None, annotations.return_last_result && i < queries.len() - 1, + &http_client, ) }) .collect::>>()?; @@ -407,6 +414,7 @@ pub async fn do_snowflake( token_is_keypair, Some(column_order), false, + &http_client, )? }; let r = run_future_with_polling_update_job_poller(