From 449164b79bf2a12a7ca845334eb49f926a272476 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 10 Oct 2024 16:53:38 +0200 Subject: [PATCH 1/4] Code cleanup and refactorings (#999) * Code cleanup and refactorings * Fixes * Clippy/format * Fixes --- Cargo.lock | 38 +++ Cargo.toml | 1 + golem-common/src/lib.rs | 26 ++ golem-common/src/model/component_metadata.rs | 10 + golem-component-service-base/Cargo.toml | 1 + .../src/api/common.rs | 21 +- .../src/repo/component.rs | 226 +++++------------- golem-component-service-base/src/repo/mod.rs | 32 --- .../src/service/component.rs | 167 +++++++------ golem-component-service/src/api/component.rs | 26 +- golem-service-base/src/config.rs | 17 ++ golem-service-base/src/lib.rs | 1 + golem-service-base/src/model.rs | 170 +++++++++++++ golem-service-base/src/repo.rs | 57 +++++ .../src/services/oplog/mod.rs | 31 +-- golem-worker-service-base/Cargo.toml | 1 + golem-worker-service-base/src/api/common.rs | 88 ++++--- golem-worker-service-base/src/api/error.rs | 31 ++- golem-worker-service-base/src/app_config.rs | 16 -- .../src/repo/api_definition.rs | 172 ++++--------- .../src/repo/api_deployment.rs | 174 ++++++-------- golem-worker-service-base/src/repo/mod.rs | 21 -- .../src/service/api_definition.rs | 73 ++++-- .../src/service/api_definition_validator.rs | 33 ++- .../src/service/api_deployment.rs | 78 +++--- .../src/service/component/default.rs | 128 +++++----- .../src/service/component/error.rs | 51 ++-- .../http/http_api_definition_validator.rs | 21 +- .../src/service/worker/default.rs | 28 +-- .../src/service/worker/error.rs | 37 ++- .../src/service/worker/routing_logic.rs | 13 +- .../src/api/api_definition.rs | 22 +- .../src/api/api_deployment.rs | 12 +- 33 files changed, 1015 insertions(+), 808 deletions(-) create mode 100644 golem-service-base/src/repo.rs diff --git a/Cargo.lock b/Cargo.lock index 56821c1e5..ee3283c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1704,6 +1704,18 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "conditional-trait-gen" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9359589034c900055ec8b3590ba1b45384b62379ffd505e3e9d641fd184d461d" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "config" version = "0.11.0" @@ -3626,6 +3638,7 @@ dependencies = [ "bincode", "bytes 1.7.2", "chrono", + "conditional-trait-gen", "criterion", "fastrand 2.1.1", "golem-api-grpc", @@ -4093,6 +4106,7 @@ dependencies = [ "bincode", "bytes 1.7.2", "chrono", + "conditional-trait-gen", "criterion", "derive_more 0.99.18", "fastrand 2.1.1", @@ -6739,6 +6753,30 @@ dependencies = [ "toml_edit 0.22.22", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.87" diff --git a/Cargo.toml b/Cargo.toml index a002ae803..9a7cec872 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ clap = { version = "4.5.4", features = [ "help", ] } cli-table = "0.4.7" +conditional-trait-gen = "0.4.1" console-subscriber = "0.3.0" ctor = "0.2.6" dashmap = "5.5.3" diff --git a/golem-common/src/lib.rs b/golem-common/src/lib.rs index 7c2aa4514..fddd9cbd0 100644 --- a/golem-common/src/lib.rs +++ b/golem-common/src/lib.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; +use std::fmt::{Display, Formatter}; + pub mod cache; pub mod client; pub mod config; @@ -27,3 +30,26 @@ pub mod retries; pub mod serialization; pub mod tracing; pub mod uri; + +/// Trait to convert a value to a string which is safe to return through a public API. +pub trait SafeDisplay { + fn to_safe_string(&self) -> String; +} + +pub struct SafeString(String); + +impl SafeDisplay for SafeString { + fn to_safe_string(&self) -> String { + self.0.clone() + } +} + +impl Display for SafeString { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +pub fn safe(value: String) -> impl SafeDisplay { + SafeString(value) +} diff --git a/golem-common/src/model/component_metadata.rs b/golem-common/src/model/component_metadata.rs index 558590a01..4df17cddb 100644 --- a/golem-common/src/model/component_metadata.rs +++ b/golem-common/src/model/component_metadata.rs @@ -15,6 +15,7 @@ use bincode::{Decode, Encode}; use std::fmt::{self, Display, Formatter}; +use crate::SafeDisplay; use golem_wasm_ast::analysis::AnalysedFunctionParameter; use golem_wasm_ast::core::Mem; use golem_wasm_ast::metadata::Producers as WasmAstProducers; @@ -363,6 +364,15 @@ pub enum ComponentProcessingError { Analysis(AnalysisFailure), } +impl SafeDisplay for ComponentProcessingError { + fn to_safe_string(&self) -> String { + match self { + ComponentProcessingError::Parsing(_) => self.to_string(), + ComponentProcessingError::Analysis(_) => self.to_string(), + } + } +} + impl Display for ComponentProcessingError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { diff --git a/golem-component-service-base/Cargo.toml b/golem-component-service-base/Cargo.toml index 2912be062..ddf8040ce 100644 --- a/golem-component-service-base/Cargo.toml +++ b/golem-component-service-base/Cargo.toml @@ -16,6 +16,7 @@ async-trait = { workspace = true } bincode = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } +conditional-trait-gen = { workspace = true } http_02 = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } diff --git a/golem-component-service-base/src/api/common.rs b/golem-component-service-base/src/api/common.rs index 8e3955ce0..d366aecd9 100644 --- a/golem-component-service-base/src/api/common.rs +++ b/golem-component-service-base/src/api/common.rs @@ -30,29 +30,40 @@ mod conversion { use crate::service::component; use golem_api_grpc::proto::golem::common::{ErrorBody, ErrorsBody}; use golem_api_grpc::proto::golem::component::v1::{component_error, ComponentError}; + use golem_common::SafeDisplay; impl From for ComponentError { fn from(value: component::ComponentError) -> Self { let error = match value { component::ComponentError::AlreadyExists(_) => { component_error::Error::AlreadyExists(ErrorBody { - error: value.to_string(), + error: value.to_safe_string(), }) } component::ComponentError::UnknownComponentId(_) | component::ComponentError::UnknownVersionedComponentId(_) => { component_error::Error::NotFound(ErrorBody { - error: value.to_string(), + error: value.to_safe_string(), }) } component::ComponentError::ComponentProcessingError(error) => { component_error::Error::BadRequest(ErrorsBody { - errors: vec![error.to_string()], + errors: vec![error.to_safe_string()], }) } - component::ComponentError::Internal(error) => { + component::ComponentError::InternalRepoError(_) => { component_error::Error::InternalError(ErrorBody { - error: error.to_string(), + error: value.to_safe_string(), + }) + } + component::ComponentError::InternalConversionError { .. } => { + component_error::Error::InternalError(ErrorBody { + error: value.to_safe_string(), + }) + } + component::ComponentError::ComponentStoreError { .. } => { + component_error::Error::InternalError(ErrorBody { + error: value.to_safe_string(), }) } }; diff --git a/golem-component-service-base/src/repo/component.rs b/golem-component-service-base/src/repo/component.rs index 31b5d3b52..89275dd62 100644 --- a/golem-component-service-base/src/repo/component.rs +++ b/golem-component-service-base/src/repo/component.rs @@ -12,18 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Display; -use std::ops::Deref; -use std::result::Result; -use std::sync::Arc; - use crate::model::Component; -use crate::repo::RepoError; use async_trait::async_trait; +use conditional_trait_gen::{trait_gen, when}; use golem_common::model::component_metadata::ComponentMetadata; use golem_common::model::{ComponentId, ComponentType}; use golem_service_base::model::{ComponentName, VersionedComponentId}; +use golem_service_base::repo::RepoError; use sqlx::{Database, Pool, Row}; +use std::fmt::Display; +use std::ops::Deref; +use std::result::Result; +use std::sync::Arc; use tracing::{debug, error}; use uuid::Uuid; @@ -229,8 +229,9 @@ impl ComponentRepo for LoggedComponentRepo sqlx::Postgres, sqlx::Sqlite)] #[async_trait] -impl ComponentRepo for DbComponentRepo { +impl ComponentRepo for DbComponentRepo { async fn create(&self, component: &ComponentRecord) -> Result<(), RepoError> { let mut transaction = self.db_pool.begin().await?; @@ -295,7 +296,7 @@ impl ComponentRepo for DbComponentRepo { cv.version AS version, cv.size AS size, cv.metadata AS metadata, - cv.created_at AS created_at + cv.created_at::timestamptz AS created_at, cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id @@ -308,7 +309,8 @@ impl ComponentRepo for DbComponentRepo { .map_err(|e| e.into()) } - async fn get_all(&self, namespace: &str) -> Result, RepoError> { + #[when(sqlx::Postgres -> get_all)] + async fn get_all_postgres(&self, namespace: &str) -> Result, RepoError> { sqlx::query_as::<_, ComponentRecord>( r#" SELECT @@ -318,7 +320,7 @@ impl ComponentRepo for DbComponentRepo { cv.version AS version, cv.size AS size, cv.metadata AS metadata, - cv.created_at AS created_at, + cv.created_at::timestamptz AS created_at, cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id @@ -331,10 +333,8 @@ impl ComponentRepo for DbComponentRepo { .map_err(|e| e.into()) } - async fn get_latest_version( - &self, - component_id: &Uuid, - ) -> Result, RepoError> { + #[when(sqlx::Sqlite -> get_all)] + async fn get_all_sqlite(&self, namespace: &str) -> Result, RepoError> { sqlx::query_as::<_, ComponentRecord>( r#" SELECT @@ -348,20 +348,19 @@ impl ComponentRepo for DbComponentRepo { cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id - WHERE c.component_id = $1 - ORDER BY cv.version DESC LIMIT 1 + WHERE c.namespace = $1 "#, ) - .bind(component_id) - .fetch_optional(self.db_pool.deref()) + .bind(namespace) + .fetch_all(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_by_version( + #[when(sqlx::Postgres -> get_latest_version)] + async fn get_latest_version_postgres( &self, component_id: &Uuid, - version: u64, ) -> Result, RepoError> { sqlx::query_as::<_, ComponentRecord>( r#" @@ -372,25 +371,25 @@ impl ComponentRepo for DbComponentRepo { cv.version AS version, cv.size AS size, cv.metadata AS metadata, - cv.created_at AS created_at, + cv.created_at::timestamptz AS created_at, cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id - WHERE c.component_id = $1 AND cv.version = $2 + WHERE c.component_id = $1 + ORDER BY cv.version DESC LIMIT 1 "#, ) .bind(component_id) - .bind(version as i64) .fetch_optional(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_by_name( + #[when(sqlx::Sqlite -> get_latest_version)] + async fn get_latest_version_sqlite( &self, - namespace: &str, - name: &str, - ) -> Result, RepoError> { + component_id: &Uuid, + ) -> Result, RepoError> { sqlx::query_as::<_, ComponentRecord>( r#" SELECT @@ -404,140 +403,22 @@ impl ComponentRepo for DbComponentRepo { cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id - WHERE c.namespace = $1 AND c.name = $2 - "#, - ) - .bind(namespace) - .bind(name) - .fetch_all(self.db_pool.deref()) - .await - .map_err(|e| e.into()) - } - - async fn get_id_by_name(&self, namespace: &str, name: &str) -> Result, RepoError> { - let result = - sqlx::query("SELECT component_id FROM components WHERE namespace = $1 AND name = $2") - .bind(namespace) - .bind(name) - .fetch_optional(self.db_pool.deref()) - .await?; - - Ok(result.map(|x| x.get("component_id"))) - } - - async fn get_namespace(&self, component_id: &Uuid) -> Result, RepoError> { - let result = sqlx::query("SELECT namespace FROM components WHERE component_id = $1") - .bind(component_id) - .fetch_optional(self.db_pool.deref()) - .await?; - - Ok(result.map(|x| x.get("namespace"))) - } - - async fn delete(&self, namespace: &str, component_id: &Uuid) -> Result<(), RepoError> { - let mut transaction = self.db_pool.begin().await?; - sqlx::query( - r#" - DELETE FROM component_versions - WHERE component_id IN (SELECT component_id FROM components WHERE namespace = $1 AND component_id = $2) - "# - ) - .bind(namespace) - .bind(component_id) - .execute(&mut *transaction) - .await?; - - sqlx::query("DELETE FROM components WHERE namespace = $1 AND component_id = $2") - .bind(namespace) - .bind(component_id) - .execute(&mut *transaction) - .await?; - - transaction.commit().await?; - Ok(()) - } -} - -#[async_trait] -impl ComponentRepo for DbComponentRepo { - async fn create(&self, component: &ComponentRecord) -> Result<(), RepoError> { - let mut transaction = self.db_pool.begin().await?; - - let result = sqlx::query("SELECT namespace, name FROM components WHERE component_id = $1") - .bind(component.component_id) - .fetch_optional(&mut *transaction) - .await?; - - if let Some(result) = result { - let namespace: String = result.get("namespace"); - let name: String = result.get("name"); - if namespace != component.namespace || name != component.name { - return Err(RepoError::Internal( - "Component namespace and name invalid".to_string(), - )); - } - } else { - sqlx::query( - r#" - INSERT INTO components - (namespace, component_id, name) - VALUES - ($1, $2, $3) - "#, - ) - .bind(component.namespace.clone()) - .bind(component.component_id) - .bind(component.name.clone()) - .execute(&mut *transaction) - .await?; - } - - sqlx::query( - r#" - INSERT INTO component_versions - (component_id, version, size, metadata, created_at, component_type) - VALUES - ($1, $2, $3, $4, $5, $6) - "#, - ) - .bind(component.component_id) - .bind(component.version) - .bind(component.size) - .bind(component.metadata.clone()) - .bind(component.created_at) - .bind(component.component_type) - .execute(&mut *transaction) - .await?; - - transaction.commit().await?; - - Ok(()) - } - - async fn get(&self, component_id: &Uuid) -> Result, RepoError> { - sqlx::query_as::<_, ComponentRecord>( - r#" - SELECT - c.namespace AS namespace, - c.name AS name, - c.component_id AS component_id, - cv.version AS version, - cv.size AS size, - cv.metadata AS metadata, - cv.created_at::timestamptz AS created_at, - cv.component_type AS component_type - FROM components c - JOIN component_versions cv ON c.component_id = cv.component_id WHERE c.component_id = $1 + ORDER BY cv.version DESC LIMIT 1 "#, ) .bind(component_id) - .fetch_all(self.db_pool.deref()) + .fetch_optional(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_all(&self, namespace: &str) -> Result, RepoError> { + #[when(sqlx::Postgres -> get_by_version)] + async fn get_by_version_postgres( + &self, + component_id: &Uuid, + version: u64, + ) -> Result, RepoError> { sqlx::query_as::<_, ComponentRecord>( r#" SELECT @@ -551,18 +432,21 @@ impl ComponentRepo for DbComponentRepo { cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id - WHERE c.namespace = $1 + WHERE c.component_id = $1 AND cv.version = $2 "#, ) - .bind(namespace) - .fetch_all(self.db_pool.deref()) + .bind(component_id) + .bind(version as i64) + .fetch_optional(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_latest_version( + #[when(sqlx::Sqlite -> get_by_version)] + async fn get_by_version_sqlite( &self, component_id: &Uuid, + version: u64, ) -> Result, RepoError> { sqlx::query_as::<_, ComponentRecord>( r#" @@ -573,25 +457,26 @@ impl ComponentRepo for DbComponentRepo { cv.version AS version, cv.size AS size, cv.metadata AS metadata, - cv.created_at::timestamptz AS created_at, + cv.created_at AS created_at, cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id - WHERE c.component_id = $1 - ORDER BY cv.version DESC LIMIT 1 + WHERE c.component_id = $1 AND cv.version = $2 "#, ) .bind(component_id) + .bind(version as i64) .fetch_optional(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_by_version( + #[when(sqlx::Postgres -> get_by_name)] + async fn get_by_name_postgres( &self, - component_id: &Uuid, - version: u64, - ) -> Result, RepoError> { + namespace: &str, + name: &str, + ) -> Result, RepoError> { sqlx::query_as::<_, ComponentRecord>( r#" SELECT @@ -605,17 +490,18 @@ impl ComponentRepo for DbComponentRepo { cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id - WHERE c.component_id = $1 AND cv.version = $2 + WHERE c.namespace = $1 AND c.name = $2 "#, ) - .bind(component_id) - .bind(version as i64) - .fetch_optional(self.db_pool.deref()) + .bind(namespace) + .bind(name) + .fetch_all(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_by_name( + #[when(sqlx::Sqlite -> get_by_name)] + async fn get_by_name_sqlite( &self, namespace: &str, name: &str, @@ -629,7 +515,7 @@ impl ComponentRepo for DbComponentRepo { cv.version AS version, cv.size AS size, cv.metadata AS metadata, - cv.created_at::timestamptz AS created_at, + cv.created_at AS created_at, cv.component_type AS component_type FROM components c JOIN component_versions cv ON c.component_id = cv.component_id diff --git a/golem-component-service-base/src/repo/mod.rs b/golem-component-service-base/src/repo/mod.rs index a32ccc71d..8e12fe008 100644 --- a/golem-component-service-base/src/repo/mod.rs +++ b/golem-component-service-base/src/repo/mod.rs @@ -13,35 +13,3 @@ // limitations under the License. pub mod component; - -use sqlx::error::ErrorKind; -use std::fmt::Display; - -#[derive(Debug)] -pub enum RepoError { - Internal(String), - UniqueViolation(String), -} - -impl From for RepoError { - fn from(error: sqlx::Error) -> Self { - if let Some(db_error) = error.as_database_error() { - if db_error.kind() == ErrorKind::UniqueViolation { - RepoError::UniqueViolation(db_error.to_string()) - } else { - RepoError::Internal(db_error.to_string()) - } - } else { - RepoError::Internal(error.to_string()) - } - } -} - -impl Display for RepoError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RepoError::UniqueViolation(error) => write!(f, "{}", error), - RepoError::Internal(error) => write!(f, "{}", error), - } - } -} diff --git a/golem-component-service-base/src/service/component.rs b/golem-component-service-base/src/service/component.rs index 6fdfde511..f8f018fc7 100644 --- a/golem-component-service-base/src/service/component.rs +++ b/golem-component-service-base/src/service/component.rs @@ -13,23 +13,24 @@ // limitations under the License. use std::fmt::{Debug, Display}; +use std::num::TryFromIntError; use std::sync::Arc; +use crate::model::Component; +use crate::repo::component::ComponentRepo; use crate::service::component_compilation::ComponentCompilationService; use crate::service::component_processor::process_component; use async_trait::async_trait; use chrono::Utc; use golem_common::model::component_metadata::ComponentProcessingError; use golem_common::model::{ComponentId, ComponentType}; -use tap::TapFallible; -use tracing::{error, info}; - -use crate::model::Component; -use crate::repo::component::ComponentRepo; -use crate::repo::RepoError; +use golem_common::SafeDisplay; use golem_service_base::model::{ComponentName, VersionedComponentId}; +use golem_service_base::repo::RepoError; use golem_service_base::service::component_object_store::ComponentObjectStore; use golem_service_base::stream::ByteStream; +use tap::TapFallible; +use tracing::{error, info}; #[derive(Debug, thiserror::Error)] pub enum ComponentError { @@ -41,23 +42,47 @@ pub enum ComponentError { UnknownVersionedComponentId(VersionedComponentId), #[error(transparent)] ComponentProcessingError(#[from] ComponentProcessingError), - #[error("Internal error: {0}")] - Internal(anyhow::Error), + #[error("Internal repository error: {0}")] + InternalRepoError(RepoError), + #[error("Internal error: failed to convert {what}: {error}")] + InternalConversionError { what: String, error: String }, + #[error("Internal component store error: {message}: {error}")] + ComponentStoreError { message: String, error: String }, } impl ComponentError { - fn internal(error: E, context: C) -> Self - where - E: Display + Debug + Send + Sync + 'static, - C: Display + Send + Sync + 'static, - { - ComponentError::Internal(anyhow::Error::msg(error).context(context)) + pub fn conversion_error(what: impl AsRef, error: String) -> ComponentError { + Self::InternalConversionError { + what: what.as_ref().to_string(), + error, + } + } + + pub fn component_store_error(message: impl AsRef, error: anyhow::Error) -> ComponentError { + Self::ComponentStoreError { + message: message.as_ref().to_string(), + error: format!("{error}"), + } + } +} + +impl SafeDisplay for ComponentError { + fn to_safe_string(&self) -> String { + match self { + ComponentError::AlreadyExists(_) => self.to_string(), + ComponentError::UnknownComponentId(_) => self.to_string(), + ComponentError::UnknownVersionedComponentId(_) => self.to_string(), + ComponentError::ComponentProcessingError(inner) => inner.to_safe_string(), + ComponentError::InternalRepoError(inner) => inner.to_safe_string(), + ComponentError::InternalConversionError { .. } => self.to_string(), + ComponentError::ComponentStoreError { .. } => self.to_string(), + } } } impl From for ComponentError { fn from(error: RepoError) -> Self { - ComponentError::internal(error, "Repository error") + ComponentError::InternalRepoError(error) } } @@ -229,7 +254,7 @@ where let record = component .clone() .try_into() - .map_err(|e| ComponentError::internal(e, "Failed to convert record"))?; + .map_err(|e| ComponentError::conversion_error("record", e))?; let result = self.component_repo.create(&record).await; if let Err(RepoError::UniqueViolation(_)) = result { @@ -263,16 +288,15 @@ where .ok_or(ComponentError::UnknownComponentId(component_id.clone())) .and_then(|c| { c.try_into() - .map_err(|e| ComponentError::internal(e, "Failed to convert record")) + .map_err(|e| ComponentError::conversion_error("record", e)) }) .map(Component::next_version)?; info!(namespace = %namespace, "Uploaded component - exports {:?}", metadata.exports); - let component_size: u64 = data - .len() - .try_into() - .map_err(|e| ComponentError::internal(e, "Failed to convert data length"))?; + let component_size: u64 = data.len().try_into().map_err(|e: TryFromIntError| { + ComponentError::conversion_error("data length", e.to_string()) + })?; tokio::try_join!( self.upload_user_component(&next_component.versioned_component_id, data.clone()), @@ -289,7 +313,7 @@ where let record = component .clone() .try_into() - .map_err(|e| ComponentError::internal(e, "Failed to convert record"))?; + .map_err(|e| ComponentError::conversion_error("record", e))?; self.component_repo.create(&record).await?; @@ -319,7 +343,7 @@ where .tap_err( |e| error!(namespace = %namespace, "Error downloading component - error: {}", e), ) - .map_err(|e| ComponentError::internal(e.to_string(), "Error downloading component")) + .map_err(|e| ComponentError::component_store_error("Error downloading component", e)) } async fn download_stream( @@ -363,7 +387,7 @@ where .await .tap_err(|e| error!(namespace = %namespace, "Error getting component data - error: {}", e)) .map_err(|e| { - ComponentError::internal(e.to_string(), "Error retrieving component") + ComponentError::component_store_error( "Error retrieving component", e) })?; Ok(Some(data)) } @@ -371,19 +395,6 @@ where } } - async fn find_id_by_name( - &self, - component_name: &ComponentName, - namespace: &Namespace, - ) -> Result, ComponentError> { - info!(namespace = %namespace, "Find component id by name"); - let records = self - .component_repo - .get_id_by_name(namespace.to_string().as_str(), &component_name.0) - .await?; - Ok(records.map(ComponentId)) - } - async fn find_by_name( &self, component_name: Option, @@ -408,27 +419,22 @@ where .iter() .map(|c| c.clone().try_into()) .collect::>, _>>() - .map_err(|e| ComponentError::internal(e, "Failed to convert record".to_string()))?; + .map_err(|e| ComponentError::conversion_error("record", e))?; Ok(values) } - async fn get( + async fn find_id_by_name( &self, - component_id: &ComponentId, + component_name: &ComponentName, namespace: &Namespace, - ) -> Result>, ComponentError> { - info!(namespace = %namespace, "Get component"); - let records = self.component_repo.get(&component_id.0).await?; - - let values: Vec> = records - .iter() - .filter(|d| d.namespace == namespace.to_string()) - .map(|c| c.clone().try_into()) - .collect::>, _>>() - .map_err(|e| ComponentError::internal(e, "Failed to convert record".to_string()))?; - - Ok(values) + ) -> Result, ComponentError> { + info!(namespace = %namespace, "Find component id by name"); + let records = self + .component_repo + .get_id_by_name(namespace.to_string().as_str(), &component_name.0) + .await?; + Ok(records.map(ComponentId)) } async fn get_by_version( @@ -445,9 +451,9 @@ where match result { Some(c) if c.namespace == namespace.to_string() => { - let value = c.try_into().map_err(|e| { - ComponentError::internal(e, "Failed to convert record".to_string()) - })?; + let value = c + .try_into() + .map_err(|e| ComponentError::conversion_error("record", e))?; Ok(Some(value)) } _ => Ok(None), @@ -467,15 +473,33 @@ where match result { Some(c) if c.namespace == namespace.to_string() => { - let value = c.try_into().map_err(|e| { - ComponentError::internal(e, "Failed to convert record".to_string()) - })?; + let value = c + .try_into() + .map_err(|e| ComponentError::conversion_error("record", e))?; Ok(Some(value)) } _ => Ok(None), } } + async fn get( + &self, + component_id: &ComponentId, + namespace: &Namespace, + ) -> Result>, ComponentError> { + info!(namespace = %namespace, "Get component"); + let records = self.component_repo.get(&component_id.0).await?; + + let values: Vec> = records + .iter() + .filter(|d| d.namespace == namespace.to_string()) + .map(|c| c.clone().try_into()) + .collect::>, _>>() + .map_err(|e| ComponentError::conversion_error("record", e))?; + + Ok(values) + } + async fn get_namespace( &self, component_id: &ComponentId, @@ -483,9 +507,13 @@ where info!("Get component namespace"); let result = self.component_repo.get_namespace(&component_id.0).await?; if let Some(result) = result { - let value = result.clone().try_into().map_err(|e| { - ComponentError::internal(e, "Failed to convert namespace".to_string()) - })?; + let value = + result + .clone() + .try_into() + .map_err(|e: >::Error| { + ComponentError::conversion_error("namespace", e.to_string()) + })?; Ok(Some(value)) } else { Ok(None) @@ -513,13 +541,13 @@ where .delete(&self.get_protected_object_store_key(&versioned_component_id)) .await .map_err(|e| { - ComponentError::internal(e.to_string(), "Failed to delete component") + ComponentError::component_store_error("Failed to delete component", e) })?; self.object_store .delete(&self.get_user_object_store_key(&versioned_component_id)) .await .map_err(|e| { - ComponentError::internal(e.to_string(), "Failed to delete component") + ComponentError::component_store_error("Failed to delete component", e) })?; } self.component_repo @@ -549,7 +577,9 @@ impl ComponentServiceDefault { self.object_store .put(&self.get_user_object_store_key(user_component_id), data) .await - .map_err(|e| ComponentError::internal(e.to_string(), "Failed to upload user component")) + .map_err(|e| { + ComponentError::component_store_error("Failed to upload user component", e) + }) } async fn upload_protected_component( @@ -564,7 +594,7 @@ impl ComponentServiceDefault { ) .await .map_err(|e| { - ComponentError::internal(e.to_string(), "Failed to upload protected component") + ComponentError::component_store_error("Failed to upload protected component", e) }) } @@ -600,16 +630,17 @@ impl ComponentServiceDefault { #[cfg(test)] mod tests { - use crate::repo::RepoError; use crate::service::component::ComponentError; + use golem_common::SafeDisplay; + use golem_service_base::repo::RepoError; #[test] pub fn test_repo_error_to_service_error() { let repo_err = RepoError::Internal("some sql error".to_string()); let component_err: ComponentError = repo_err.into(); assert_eq!( - component_err.to_string(), - "Internal error: Repository error".to_string() + component_err.to_safe_string(), + "Internal repository error".to_string() ); } } diff --git a/golem-component-service/src/api/component.rs b/golem-component-service/src/api/component.rs index 6390036e6..cd6d26bea 100644 --- a/golem-component-service/src/api/component.rs +++ b/golem-component-service/src/api/component.rs @@ -31,7 +31,7 @@ use std::sync::Arc; use tracing::Instrument; use golem_common::metrics::api::TraceErrorKind; -use golem_common::recorded_http_api_request; +use golem_common::{recorded_http_api_request, SafeDisplay}; #[derive(ApiResponse, Debug, Clone)] pub enum ComponentError { @@ -77,22 +77,32 @@ impl From for ComponentError { ComponentServiceError::UnknownComponentId(_) | ComponentServiceError::UnknownVersionedComponentId(_) => { ComponentError::NotFound(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })) } ComponentServiceError::AlreadyExists(_) => { ComponentError::AlreadyExists(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })) } - ComponentServiceError::Internal(error) => { + ComponentServiceError::ComponentProcessingError(error) => { + ComponentError::BadRequest(Json(ErrorsBody { + errors: vec![error.to_safe_string()], + })) + } + ComponentServiceError::InternalRepoError(_) => { ComponentError::InternalError(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })) } - ComponentServiceError::ComponentProcessingError(error) => { - ComponentError::BadRequest(Json(ErrorsBody { - errors: vec![error.to_string()], + ComponentServiceError::InternalConversionError { .. } => { + ComponentError::InternalError(Json(ErrorBody { + error: error.to_safe_string(), + })) + } + ComponentServiceError::ComponentStoreError { .. } => { + ComponentError::InternalError(Json(ErrorBody { + error: error.to_safe_string(), })) } } diff --git a/golem-service-base/src/config.rs b/golem-service-base/src/config.rs index 5859d47f4..295324cdc 100644 --- a/golem-service-base/src/config.rs +++ b/golem-service-base/src/config.rs @@ -13,6 +13,7 @@ // limitations under the License. use serde::{Deserialize, Serialize}; +use std::time::Duration; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "type", content = "config")] @@ -41,3 +42,19 @@ pub struct ComponentStoreLocalConfig { pub root_path: String, pub object_prefix: String, } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct WorkerExecutorClientCacheConfig { + pub max_capacity: usize, + #[serde(with = "humantime_serde")] + pub time_to_idle: Duration, +} + +impl Default for WorkerExecutorClientCacheConfig { + fn default() -> Self { + Self { + max_capacity: 1000, + time_to_idle: Duration::from_secs(60 * 60 * 4), + } + } +} diff --git a/golem-service-base/src/lib.rs b/golem-service-base/src/lib.rs index 970bc6227..f55e12fa3 100644 --- a/golem-service-base/src/lib.rs +++ b/golem-service-base/src/lib.rs @@ -16,6 +16,7 @@ pub mod auth; pub mod config; pub mod db; pub mod model; +pub mod repo; pub mod routing_table; pub mod service; pub mod stream; diff --git a/golem-service-base/src/model.rs b/golem-service-base/src/model.rs index c0af8cb26..7d1391ee6 100644 --- a/golem-service-base/src/model.rs +++ b/golem-service-base/src/model.rs @@ -19,6 +19,7 @@ use golem_common::model::{ ComponentId, ComponentType, ComponentVersion, PromiseId, ScanCursor, ShardId, Timestamp, WorkerFilter, WorkerId, WorkerStatus, }; +use golem_common::SafeDisplay; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use poem_openapi::{Enum, NewType, Object, Union}; use serde::{Deserialize, Serialize}; @@ -147,6 +148,12 @@ pub struct GolemErrorInvalidRequest { pub details: String, } +impl SafeDisplay for GolemErrorInvalidRequest { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorInvalidRequest { fn from(value: golem_api_grpc::proto::golem::worker::v1::InvalidRequest) -> Self { Self { @@ -171,6 +178,12 @@ pub struct GolemErrorWorkerAlreadyExists { pub worker_id: WorkerId, } +impl SafeDisplay for GolemErrorWorkerAlreadyExists { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorWorkerAlreadyExists { @@ -206,6 +219,12 @@ pub struct GolemErrorWorkerNotFound { pub worker_id: WorkerId, } +impl SafeDisplay for GolemErrorWorkerNotFound { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorWorkerNotFound { @@ -240,6 +259,12 @@ pub struct GolemErrorWorkerCreationFailed { pub details: String, } +impl SafeDisplay for GolemErrorWorkerCreationFailed { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorWorkerCreationFailed { @@ -278,6 +303,12 @@ pub struct GolemErrorFailedToResumeWorker { pub reason: Box, } +impl SafeDisplay for GolemErrorFailedToResumeWorker { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorFailedToResumeWorker { @@ -316,6 +347,12 @@ pub struct GolemErrorComponentDownloadFailed { pub reason: String, } +impl SafeDisplay for GolemErrorComponentDownloadFailed { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorComponentDownloadFailed { @@ -362,6 +399,12 @@ pub struct GolemErrorComponentParseFailed { pub reason: String, } +impl SafeDisplay for GolemErrorComponentParseFailed { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorComponentParseFailed { @@ -408,6 +451,12 @@ pub struct GolemErrorGetLatestVersionOfComponentFailed { pub reason: String, } +impl SafeDisplay for GolemErrorGetLatestVersionOfComponentFailed { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorGetLatestVersionOfComponentFailed { @@ -445,6 +494,12 @@ pub struct GolemErrorPromiseNotFound { pub promise_id: PromiseId, } +impl SafeDisplay for GolemErrorPromiseNotFound { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorPromiseNotFound { @@ -478,6 +533,12 @@ pub struct GolemErrorPromiseDropped { pub promise_id: PromiseId, } +impl SafeDisplay for GolemErrorPromiseDropped { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorPromiseDropped { @@ -511,6 +572,12 @@ pub struct GolemErrorPromiseAlreadyCompleted { pub promise_id: PromiseId, } +impl SafeDisplay for GolemErrorPromiseAlreadyCompleted { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorPromiseAlreadyCompleted { @@ -547,6 +614,13 @@ impl From pub struct GolemErrorInterrupted { pub recover_immediately: bool, } + +impl SafeDisplay for GolemErrorInterrupted { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorInterrupted { fn from(value: golem_api_grpc::proto::golem::worker::v1::Interrupted) -> Self { Self { @@ -569,6 +643,12 @@ pub struct GolemErrorParamTypeMismatch { pub details: String, } +impl SafeDisplay for GolemErrorParamTypeMismatch { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorParamTypeMismatch { @@ -593,6 +673,12 @@ impl From #[error("No value in message")] pub struct GolemErrorNoValueInMessage {} +impl SafeDisplay for GolemErrorNoValueInMessage { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorNoValueInMessage { @@ -615,6 +701,12 @@ pub struct GolemErrorValueMismatch { pub details: String, } +impl SafeDisplay for GolemErrorValueMismatch { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorValueMismatch { fn from(value: golem_api_grpc::proto::golem::worker::v1::ValueMismatch) -> Self { Self { @@ -638,6 +730,12 @@ pub struct GolemErrorUnexpectedOplogEntry { pub got: String, } +impl SafeDisplay for GolemErrorUnexpectedOplogEntry { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorUnexpectedOplogEntry { @@ -666,6 +764,12 @@ pub struct GolemErrorRuntimeError { pub details: String, } +impl SafeDisplay for GolemErrorRuntimeError { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorRuntimeError { fn from(value: golem_api_grpc::proto::golem::worker::v1::RuntimeError) -> Self { Self { @@ -691,6 +795,12 @@ pub struct GolemErrorInvalidShardId { pub shard_ids: std::collections::HashSet, } +impl SafeDisplay for GolemErrorInvalidShardId { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl TryFrom for GolemErrorInvalidShardId { @@ -720,6 +830,12 @@ pub struct GolemErrorPreviousInvocationFailed { pub details: String, } +impl SafeDisplay for GolemErrorPreviousInvocationFailed { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorPreviousInvocationFailed { @@ -744,6 +860,12 @@ impl From #[error("Previous invocation exited")] pub struct GolemErrorPreviousInvocationExited {} +impl SafeDisplay for GolemErrorPreviousInvocationExited { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorPreviousInvocationExited { @@ -766,6 +888,12 @@ pub struct GolemErrorUnknown { pub details: String, } +impl SafeDisplay for GolemErrorUnknown { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorUnknown { fn from(value: golem_api_grpc::proto::golem::worker::v1::UnknownError) -> Self { Self { @@ -786,6 +914,12 @@ impl From for golem_api_grpc::proto::golem::worker::v1::Unkno #[error("Invalid account")] pub struct GolemErrorInvalidAccount {} +impl SafeDisplay for GolemErrorInvalidAccount { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for GolemErrorInvalidAccount { fn from(_value: golem_api_grpc::proto::golem::worker::v1::InvalidAccount) -> Self { Self {} @@ -802,6 +936,12 @@ impl From for golem_api_grpc::proto::golem::worker::v1 #[error("Invalid account")] pub struct GolemErrorShardingNotReady {} +impl SafeDisplay for GolemErrorShardingNotReady { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + impl From for crate::model::GolemErrorShardingNotReady { @@ -1195,6 +1335,36 @@ pub enum GolemError { ShardingNotReady(GolemErrorShardingNotReady), } +impl SafeDisplay for GolemError { + fn to_safe_string(&self) -> String { + match self { + GolemError::InvalidRequest(inner) => inner.to_safe_string(), + GolemError::WorkerAlreadyExists(inner) => inner.to_safe_string(), + GolemError::WorkerNotFound(inner) => inner.to_safe_string(), + GolemError::WorkerCreationFailed(inner) => inner.to_safe_string(), + GolemError::FailedToResumeWorker(inner) => inner.to_safe_string(), + GolemError::ComponentDownloadFailed(inner) => inner.to_safe_string(), + GolemError::ComponentParseFailed(inner) => inner.to_safe_string(), + GolemError::GetLatestVersionOfComponentFailed(inner) => inner.to_safe_string(), + GolemError::PromiseNotFound(inner) => inner.to_safe_string(), + GolemError::PromiseDropped(inner) => inner.to_safe_string(), + GolemError::PromiseAlreadyCompleted(inner) => inner.to_safe_string(), + GolemError::Interrupted(inner) => inner.to_safe_string(), + GolemError::ParamTypeMismatch(inner) => inner.to_safe_string(), + GolemError::NoValueInMessage(inner) => inner.to_safe_string(), + GolemError::ValueMismatch(inner) => inner.to_safe_string(), + GolemError::UnexpectedOplogEntry(inner) => inner.to_safe_string(), + GolemError::RuntimeError(inner) => inner.to_safe_string(), + GolemError::InvalidShardId(inner) => inner.to_safe_string(), + GolemError::PreviousInvocationFailed(inner) => inner.to_safe_string(), + GolemError::PreviousInvocationExited(inner) => inner.to_safe_string(), + GolemError::Unknown(inner) => inner.to_safe_string(), + GolemError::InvalidAccount(inner) => inner.to_safe_string(), + GolemError::ShardingNotReady(inner) => inner.to_safe_string(), + } + } +} + impl TryFrom for GolemError { type Error = String; diff --git a/golem-service-base/src/repo.rs b/golem-service-base/src/repo.rs new file mode 100644 index 000000000..1b0801a66 --- /dev/null +++ b/golem-service-base/src/repo.rs @@ -0,0 +1,57 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use golem_common::SafeDisplay; +use sqlx::error::ErrorKind; +use std::fmt::Display; + +#[derive(Debug, thiserror::Error)] +pub enum RepoError { + Internal(String), + UniqueViolation(String), +} + +impl From for RepoError { + fn from(error: sqlx::Error) -> Self { + if let Some(db_error) = error.as_database_error() { + if db_error.kind() == ErrorKind::UniqueViolation { + RepoError::UniqueViolation(db_error.to_string()) + } else { + RepoError::Internal(db_error.to_string()) + } + } else { + RepoError::Internal(error.to_string()) + } + } +} + +impl Display for RepoError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RepoError::UniqueViolation(error) => write!(f, "{}", error), + RepoError::Internal(error) => write!(f, "{}", error), + } + } +} + +impl SafeDisplay for RepoError { + fn to_safe_string(&self) -> String { + match self { + RepoError::Internal(_) => "Internal repository error".to_string(), + RepoError::UniqueViolation(_) => { + "Internal repository error (unique key violation)".to_string() + } + } + } +} diff --git a/golem-worker-executor-base/src/services/oplog/mod.rs b/golem-worker-executor-base/src/services/oplog/mod.rs index faf7f94d0..ea5e33848 100644 --- a/golem-worker-executor-base/src/services/oplog/mod.rs +++ b/golem-worker-executor-base/src/services/oplog/mod.rs @@ -22,9 +22,8 @@ use std::time::Duration; use async_trait::async_trait; use bincode::{Decode, Encode}; -use bytes::Bytes; - pub use blob::BlobOplogArchiveService; +use bytes::Bytes; pub use compressed::{CompressedOplogArchive, CompressedOplogArchiveService, CompressedOplogChunk}; use golem_common::cache::{BackgroundEvictionMode, Cache, FullCacheEvictionMode}; use golem_common::model::oplog::{ @@ -37,6 +36,7 @@ use golem_common::model::{ use golem_common::serialization::{serialize, try_deserialize}; pub use multilayer::{MultiLayerOplog, MultiLayerOplogService, OplogArchiveService}; pub use primary::PrimaryOplogService; +use tracing::Instrument; use crate::error::GolemError; @@ -375,18 +375,21 @@ impl OpenOplogs { worker_id, || Ok(()), |_| { - Box::pin(async move { - let result = constructor_clone.create_oplog(close).await; - - // Temporarily increasing ref count because we want to store a weak pointer - // but not drop it before we re-gain a strong reference when got out of the cache - let result = unsafe { - let ptr = Arc::into_raw(result); - Arc::increment_strong_count(ptr); - Arc::from_raw(ptr) - }; - Ok(OpenOplogEntry::new(result)) - }) + Box::pin( + async move { + let result = constructor_clone.create_oplog(close).await; + + // Temporarily increasing ref count because we want to store a weak pointer + // but not drop it before we re-gain a strong reference when got out of the cache + let result = unsafe { + let ptr = Arc::into_raw(result); + Arc::increment_strong_count(ptr); + Arc::from_raw(ptr) + }; + Ok(OpenOplogEntry::new(result)) + } + .in_current_span(), + ) }, ) .await diff --git a/golem-worker-service-base/Cargo.toml b/golem-worker-service-base/Cargo.toml index 140cc45cf..602976360 100644 --- a/golem-worker-service-base/Cargo.toml +++ b/golem-worker-service-base/Cargo.toml @@ -22,6 +22,7 @@ async-trait = { workspace = true } bincode = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } +conditional-trait-gen = { workspace = true } derive_more = { workspace = true } figment = { workspace = true } futures = { workspace = true } diff --git a/golem-worker-service-base/src/api/common.rs b/golem-worker-service-base/src/api/common.rs index 72695f4e6..59cc15146 100644 --- a/golem-worker-service-base/src/api/common.rs +++ b/golem-worker-service-base/src/api/common.rs @@ -1,9 +1,10 @@ -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Formatter}; use crate::service::http::http_api_definition_validator::RouteValidationError; use golem_api_grpc::proto::golem::apidefinition::v1::{api_definition_error, ApiDefinitionError}; use golem_api_grpc::proto::golem::worker; use golem_common::metrics::api::TraceErrorKind; +use golem_common::SafeDisplay; use golem_service_base::model::ErrorBody; use poem_openapi::payload::Json; use poem_openapi::{ApiResponse, Object, Union}; @@ -57,40 +58,40 @@ impl TraceErrorKind for ApiEndpointError { } impl ApiEndpointError { - pub fn unauthorized(error: T) -> Self { + pub fn unauthorized(error: T) -> Self { Self::Unauthorized(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })) } - pub fn forbidden(error: T) -> Self { + pub fn forbidden(error: T) -> Self { Self::Forbidden(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })) } - pub fn internal(error: T) -> Self { + pub fn internal(error: T) -> Self { Self::InternalError(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })) } - pub fn bad_request(error: T) -> Self { + pub fn bad_request(error: T) -> Self { Self::BadRequest(Json(WorkerServiceErrorsBody::Messages( MessagesErrorsBody { - errors: vec![error.to_string()], + errors: vec![error.to_safe_string()], }, ))) } - pub fn not_found(error: T) -> Self { + pub fn not_found(error: T) -> Self { Self::NotFound(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })) } - pub fn already_exists(error: T) -> Self { - Self::AlreadyExists(Json(error.to_string())) + pub fn already_exists(error: T) -> Self { + Self::AlreadyExists(Json(error.to_safe_string())) } } @@ -145,6 +146,7 @@ impl<'a> TraceErrorKind for ApiDefinitionTraceErrorKind<'a> { } mod conversion { + use super::{ApiEndpointError, ValidationErrorsBody, WorkerServiceErrorsBody}; use crate::service::api_definition::ApiDefinitionError as ApiDefinitionServiceError; use crate::service::api_definition_validator::ValidationErrors; use crate::service::api_deployment::ApiDeploymentError; @@ -155,20 +157,16 @@ mod conversion { apidefinition::v1::{api_definition_error, ApiDefinitionError, RouteValidationErrorsBody}, common::ErrorBody, }; + use golem_common::SafeDisplay; use poem_openapi::payload::Json; use std::fmt::Display; - use super::{ApiEndpointError, ValidationErrorsBody, WorkerServiceErrorsBody}; - impl From> for ApiEndpointError { fn from(error: ApiDefinitionServiceError) -> Self { match error { ApiDefinitionServiceError::ValidationError(e) => e.into(), - e @ ApiDefinitionServiceError::ComponentNotFoundError(_) => { - ApiEndpointError::bad_request(e) - } - ApiDefinitionServiceError::InternalError(error) => { - ApiEndpointError::internal(error) + ApiDefinitionServiceError::ComponentNotFoundError(_) => { + ApiEndpointError::bad_request(error) } ApiDefinitionServiceError::ApiDefinitionNotDraft(_) => { ApiEndpointError::bad_request(error) @@ -185,6 +183,10 @@ mod conversion { ApiDefinitionServiceError::RibCompilationErrors(_) => { ApiEndpointError::bad_request(error) } + ApiDefinitionServiceError::InternalRepoError(_) => { + ApiEndpointError::internal(error) + } + ApiDefinitionServiceError::Internal(_) => ApiEndpointError::internal(error), } } } @@ -192,18 +194,21 @@ mod conversion { impl From> for ApiEndpointError { fn from(error: ApiDeploymentError) -> Self { match error { - ApiDeploymentError::InternalError(error) => ApiEndpointError::internal(error), - e @ ApiDeploymentError::ApiDefinitionNotFound(_, _) => { - ApiEndpointError::not_found(e) + ApiDeploymentError::ApiDefinitionNotFound(_, _) => { + ApiEndpointError::not_found(error) } - e @ ApiDeploymentError::ApiDeploymentNotFound(_, _) => { - ApiEndpointError::not_found(e) + ApiDeploymentError::ApiDeploymentNotFound(_, _) => { + ApiEndpointError::not_found(error) } - e @ ApiDeploymentError::ApiDeploymentConflict(_) => { - ApiEndpointError::already_exists(e) + ApiDeploymentError::ApiDeploymentConflict(_) => { + ApiEndpointError::already_exists(error) } - e @ ApiDeploymentError::ApiDefinitionsConflict(_) => { - ApiEndpointError::bad_request(e) + ApiDeploymentError::ApiDefinitionsConflict(_) => { + ApiEndpointError::bad_request(error) + } + ApiDeploymentError::InternalRepoError(_) => ApiEndpointError::internal(error), + ApiDeploymentError::InternalConversionError { .. } => { + ApiEndpointError::internal(error) } } } @@ -250,32 +255,27 @@ mod conversion { } ApiDefinitionServiceError::RibCompilationErrors(_) => ApiDefinitionError { error: Some(api_definition_error::Error::NotFound(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })), }, ApiDefinitionServiceError::ApiDefinitionNotFound(_) => ApiDefinitionError { error: Some(api_definition_error::Error::NotFound(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })), }, ApiDefinitionServiceError::ApiDefinitionNotDraft(_) => ApiDefinitionError { error: Some(api_definition_error::Error::NotDraft(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })), }, ApiDefinitionServiceError::ApiDefinitionAlreadyExists(_) => ApiDefinitionError { error: Some(api_definition_error::Error::AlreadyExists(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })), }, ApiDefinitionServiceError::ApiDefinitionDeployed(_) => ApiDefinitionError { error: Some(api_definition_error::Error::BadRequest(ErrorsBody { - errors: vec![error.to_string()], - })), - }, - ApiDefinitionServiceError::InternalError(error) => ApiDefinitionError { - error: Some(api_definition_error::Error::InternalError(ErrorBody { - error: error.to_string(), + errors: vec![error.to_safe_string()], })), }, ApiDefinitionServiceError::ComponentNotFoundError(error) => ApiDefinitionError { @@ -290,6 +290,16 @@ mod conversion { ), })), }, + ApiDefinitionServiceError::InternalRepoError(_) => ApiDefinitionError { + error: Some(api_definition_error::Error::InternalError(ErrorBody { + error: error.to_safe_string(), + })), + }, + ApiDefinitionServiceError::Internal(_) => ApiDefinitionError { + error: Some(api_definition_error::Error::InternalError(ErrorBody { + error: error.to_safe_string(), + })), + }, } } } diff --git a/golem-worker-service-base/src/api/error.rs b/golem-worker-service-base/src/api/error.rs index e0cb43659..95c1460b9 100644 --- a/golem-worker-service-base/src/api/error.rs +++ b/golem-worker-service-base/src/api/error.rs @@ -1,12 +1,12 @@ +use crate::service::component::ComponentServiceError; +use crate::service::worker::WorkerServiceError; use golem_common::metrics::api::TraceErrorKind; +use golem_common::SafeDisplay; use golem_service_base::model::*; use poem_openapi::payload::Json; use poem_openapi::*; use tonic::Status; -use crate::service::component::ComponentServiceError; -use crate::service::worker::WorkerServiceError; - // The dependents og golem-worker-service-base // is expected to exposer worker api endpoints // that can rely on WorkerApiBaseError @@ -81,20 +81,21 @@ impl From for WorkerApiBaseError { } match error { - ServiceError::Internal(_) => internal(error.to_string()), + ServiceError::Internal(_) => internal(error.to_safe_string()), ServiceError::TypeChecker(_) => WorkerApiBaseError::BadRequest(Json(ErrorsBody { - errors: vec![error.to_string()], + errors: vec![error.to_safe_string()], })), ServiceError::VersionedComponentIdNotFound(_) | ServiceError::ComponentNotFound(_) | ServiceError::AccountIdNotFound(_) | ServiceError::WorkerNotFound(_) => WorkerApiBaseError::NotFound(Json(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), })), ServiceError::Golem(golem_error) => { WorkerApiBaseError::InternalError(Json(GolemErrorBody { golem_error })) } ServiceError::Component(error) => error.into(), + ServiceError::InternalCallError(_) => internal(error.to_safe_string()), } } } @@ -110,9 +111,7 @@ impl From for WorkerApiBaseError { } ComponentServiceError::Internal(error) => { WorkerApiBaseError::InternalError(Json(GolemErrorBody { - golem_error: GolemError::Unknown(GolemErrorUnknown { - details: error.to_string(), - }), + golem_error: GolemError::Unknown(GolemErrorUnknown { details: error }), })) } @@ -125,6 +124,20 @@ impl From for WorkerApiBaseError { ComponentServiceError::Forbidden(error) => { WorkerApiBaseError::Forbidden(Json(ErrorBody { error })) } + ComponentServiceError::FailedGrpcStatus(_) => { + WorkerApiBaseError::InternalError(Json(GolemErrorBody { + golem_error: GolemError::Unknown(GolemErrorUnknown { + details: value.to_safe_string(), + }), + })) + } + ComponentServiceError::FailedTransport(_) => { + WorkerApiBaseError::InternalError(Json(GolemErrorBody { + golem_error: GolemError::Unknown(GolemErrorUnknown { + details: value.to_safe_string(), + }), + })) + } } } } diff --git a/golem-worker-service-base/src/app_config.rs b/golem-worker-service-base/src/app_config.rs index 4de95cf29..d6fe2c3b2 100644 --- a/golem-worker-service-base/src/app_config.rs +++ b/golem-worker-service-base/src/app_config.rs @@ -27,22 +27,6 @@ pub struct WorkerServiceBaseConfig { pub worker_executor_retries: RetryConfig, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct WorkerExecutorClientCacheConfig { - pub max_capacity: usize, - #[serde(with = "humantime_serde")] - pub time_to_idle: Duration, -} - -impl Default for WorkerExecutorClientCacheConfig { - fn default() -> Self { - Self { - max_capacity: 1000, - time_to_idle: Duration::from_secs(60 * 60 * 4), - } - } -} - impl WorkerServiceBaseConfig { pub fn is_local_env(&self) -> bool { self.environment.to_lowercase() == "local" diff --git a/golem-worker-service-base/src/repo/api_definition.rs b/golem-worker-service-base/src/repo/api_definition.rs index 97d83f639..3df3f25f0 100644 --- a/golem-worker-service-base/src/repo/api_definition.rs +++ b/golem-worker-service-base/src/repo/api_definition.rs @@ -13,8 +13,9 @@ // limitations under the License. use crate::api_definition::http::{CompiledHttpApiDefinition, HttpApiDefinition}; -use crate::repo::RepoError; use async_trait::async_trait; +use conditional_trait_gen::{trait_gen, when}; +use golem_service_base::repo::RepoError; use sqlx::{Database, Pool, Row}; use std::fmt::Display; use std::ops::Deref; @@ -121,8 +122,9 @@ impl DbApiDefinitionRepo { } } +#[trait_gen(sqlx::Postgres -> sqlx::Postgres, sqlx::Sqlite)] #[async_trait] -impl ApiDefinitionRepo for DbApiDefinitionRepo { +impl ApiDefinitionRepo for DbApiDefinitionRepo { async fn create(&self, definition: &ApiDefinitionRecord) -> Result<(), RepoError> { sqlx::query( r#" @@ -157,6 +159,7 @@ impl ApiDefinitionRepo for DbApiDefinitionRepo { .bind(definition.version.clone()) .bind(definition.draft) .bind(definition.data.clone()) + .bind(definition.created_at) .execute(self.db_pool.deref()) .await?; @@ -187,7 +190,24 @@ impl ApiDefinitionRepo for DbApiDefinitionRepo { Ok(()) } - async fn get( + #[when(sqlx::Postgres -> get)] + async fn get_postgres( + &self, + namespace: &str, + id: &str, + version: &str, + ) -> Result, RepoError> { + sqlx::query_as::<_, ApiDefinitionRecord>("SELECT namespace, id, version, draft, data, created_at::timestamptz FROM api_definitions WHERE namespace = $1 AND id = $2 AND version = $3") + .bind(namespace) + .bind(id) + .bind(version) + .fetch_optional(self.db_pool.deref()) + .await + .map_err(|e| e.into()) + } + + #[when(sqlx::Sqlite -> get)] + async fn get_sqlite( &self, namespace: &str, id: &str, @@ -234,9 +254,13 @@ impl ApiDefinitionRepo for DbApiDefinitionRepo { Ok(result.rows_affected() > 0) } - async fn get_all(&self, namespace: &str) -> Result, RepoError> { + #[when(sqlx::Postgres -> get_all)] + async fn get_all_postgres( + &self, + namespace: &str, + ) -> Result, RepoError> { sqlx::query_as::<_, ApiDefinitionRecord>( - "SELECT namespace, id, version, draft, data, created_at FROM api_definitions WHERE namespace = $1", + "SELECT namespace, id, version, draft, data, created_at::timestamptz FROM api_definitions WHERE namespace = $1", ) .bind(namespace) .fetch_all(self.db_pool.deref()) @@ -244,150 +268,38 @@ impl ApiDefinitionRepo for DbApiDefinitionRepo { .map_err(|e| e.into()) } - async fn get_all_versions( - &self, - namespace: &str, - id: &str, - ) -> Result, RepoError> { - sqlx::query_as::<_, ApiDefinitionRecord>("SELECT namespace, id, version, draft, data, created_at FROM api_definitions WHERE namespace = $1 AND id = $2") + #[when(sqlx::Sqlite -> get_all)] + async fn get_all_sqlite(&self, namespace: &str) -> Result, RepoError> { + sqlx::query_as::<_, ApiDefinitionRecord>( + "SELECT namespace, id, version, draft, data, created_at FROM api_definitions WHERE namespace = $1", + ) .bind(namespace) - .bind(id) .fetch_all(self.db_pool.deref()) .await .map_err(|e| e.into()) } -} -#[async_trait] -impl ApiDefinitionRepo for DbApiDefinitionRepo { - async fn create(&self, definition: &ApiDefinitionRecord) -> Result<(), RepoError> { - sqlx::query( - r#" - INSERT INTO api_definitions - (namespace, id, version, draft, data, created_at) - VALUES - ($1, $2, $3, $4, $5, $6) - "#, - ) - .bind(definition.namespace.clone()) - .bind(definition.id.clone()) - .bind(definition.version.clone()) - .bind(definition.draft) - .bind(definition.data.clone()) - .bind(definition.created_at) - .execute(self.db_pool.deref()) - .await?; - - Ok(()) - } - - async fn update(&self, definition: &ApiDefinitionRecord) -> Result<(), RepoError> { - sqlx::query( - r#" - UPDATE api_definitions - SET draft = $4, data = $5 - WHERE namespace = $1 AND id = $2 AND version = $3 - "#, - ) - .bind(definition.namespace.clone()) - .bind(definition.id.clone()) - .bind(definition.version.clone()) - .bind(definition.draft) - .bind(definition.data.clone()) - .bind(definition.created_at) - .execute(self.db_pool.deref()) - .await?; - - Ok(()) - } - - async fn set_draft( + #[when(sqlx::Postgres -> get_all_versions)] + async fn get_all_versions_postgres( &self, namespace: &str, id: &str, - version: &str, - draft: bool, - ) -> Result<(), RepoError> { - sqlx::query( - r#" - UPDATE api_definitions - SET draft = $4 - WHERE namespace = $1 AND id = $2 AND version = $3 - "#, - ) - .bind(namespace) - .bind(id) - .bind(version) - .bind(draft) - .execute(self.db_pool.deref()) - .await?; - - Ok(()) - } - - async fn get( - &self, - namespace: &str, - id: &str, - version: &str, - ) -> Result, RepoError> { - sqlx::query_as::<_, ApiDefinitionRecord>("SELECT namespace, id, version, draft, data, created_at::timestamptz FROM api_definitions WHERE namespace = $1 AND id = $2 AND version = $3") + ) -> Result, RepoError> { + sqlx::query_as::<_, ApiDefinitionRecord>("SELECT namespace, id, version, draft, data, created_at::timestamptz FROM api_definitions WHERE namespace = $1 AND id = $2") .bind(namespace) .bind(id) - .bind(version) - .fetch_optional(self.db_pool.deref()) + .fetch_all(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_draft( - &self, - namespace: &str, - id: &str, - version: &str, - ) -> Result, RepoError> { - let result = sqlx::query( - "SELECT draft FROM api_definitions WHERE namespace = $1 AND id = $2 AND version = $3", - ) - .bind(namespace) - .bind(id) - .bind(version) - .fetch_optional(self.db_pool.deref()) - .await?; - - let draft: Option = result.map(|r| r.get("draft")); - Ok(draft) - } - - async fn delete(&self, namespace: &str, id: &str, version: &str) -> Result { - let result = sqlx::query( - "DELETE FROM api_definitions WHERE namespace = $1 AND id = $2 AND version = $3", - ) - .bind(namespace) - .bind(id) - .bind(version) - .execute(self.db_pool.deref()) - .await?; - - Ok(result.rows_affected() > 0) - } - - async fn get_all(&self, namespace: &str) -> Result, RepoError> { - sqlx::query_as::<_, ApiDefinitionRecord>( - "SELECT namespace, id, version, draft, data, created_at::timestamptz FROM api_definitions WHERE namespace = $1", - ) - .bind(namespace) - .fetch_all(self.db_pool.deref()) - .await - .map_err(|e| e.into()) - } - - async fn get_all_versions( + #[when(sqlx::Sqlite -> get_all_versions)] + async fn get_all_versions_sqlite( &self, namespace: &str, id: &str, ) -> Result, RepoError> { - sqlx::query_as::<_, ApiDefinitionRecord>("SELECT namespace, id, version, draft, data, created_at::timestamptz FROM api_definitions WHERE namespace = $1 AND id = $2") + sqlx::query_as::<_, ApiDefinitionRecord>("SELECT namespace, id, version, draft, data, created_at FROM api_definitions WHERE namespace = $1 AND id = $2") .bind(namespace) .bind(id) .fetch_all(self.db_pool.deref()) diff --git a/golem-worker-service-base/src/repo/api_deployment.rs b/golem-worker-service-base/src/repo/api_deployment.rs index 93039dcd2..2f09fe940 100644 --- a/golem-worker-service-base/src/repo/api_deployment.rs +++ b/golem-worker-service-base/src/repo/api_deployment.rs @@ -14,9 +14,10 @@ use crate::api_definition::ApiSite; use crate::repo::api_definition::ApiDefinitionRecord; -use crate::repo::RepoError; use crate::service::api_definition::ApiDefinitionIdWithVersion; use async_trait::async_trait; +use conditional_trait_gen::{trait_gen, when}; +use golem_service_base::repo::RepoError; use sqlx::{Database, Pool}; use std::fmt::Display; use std::ops::Deref; @@ -89,8 +90,9 @@ impl DbApiDeploymentRepo { } } +#[trait_gen(sqlx::Postgres -> sqlx::Postgres, sqlx::Sqlite)] #[async_trait] -impl ApiDeploymentRepo for DbApiDeploymentRepo { +impl ApiDeploymentRepo for DbApiDeploymentRepo { async fn create(&self, deployments: Vec) -> Result<(), RepoError> { if !deployments.is_empty() { let mut transaction = self.db_pool.begin().await?; @@ -139,14 +141,15 @@ impl ApiDeploymentRepo for DbApiDeploymentRepo { } } - async fn get_by_id( + #[when(sqlx::Postgres -> get_by_id)] + async fn get_by_id_postgres( &self, namespace: &str, definition_id: &str, ) -> Result, RepoError> { sqlx::query_as::<_, ApiDeploymentRecord>( r#" - SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at + SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at::timestamptz FROM api_deployments WHERE namespace = $1 AND definition_id = $2 "#, @@ -158,131 +161,50 @@ impl ApiDeploymentRepo for DbApiDeploymentRepo { .map_err(|e| e.into()) } - async fn get_by_id_and_version( + #[when(sqlx::Sqlite -> get_by_id)] + async fn get_by_id_sqlite( &self, namespace: &str, definition_id: &str, - definition_version: &str, ) -> Result, RepoError> { sqlx::query_as::<_, ApiDeploymentRecord>( r#" SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at FROM api_deployments - WHERE namespace = $1 AND definition_id = $2 AND definition_version = $3 - "#, - ) - .bind(namespace) - .bind(definition_id) - .bind(definition_version) - .fetch_all(self.db_pool.deref()) - .await - .map_err(|e| e.into()) - } - - async fn get_by_site(&self, site: &str) -> Result, RepoError> { - sqlx::query_as::<_, ApiDeploymentRecord>( - r#" - SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at - FROM api_deployments - WHERE site = $1 + WHERE namespace = $1 AND definition_id = $2 "#, ) - .bind(site) - .fetch_all(self.db_pool.deref()) - .await - .map_err(|e| e.into()) - } - - async fn get_definitions_by_site( - &self, - site: &str, - ) -> Result, RepoError> { - sqlx::query_as::<_, ApiDefinitionRecord>( - r#" - SELECT api_definitions.namespace, api_definitions.id, api_definitions.version, api_definitions.draft, api_definitions.data, api_definitions.created_at - FROM api_deployments - JOIN api_definitions ON api_deployments.namespace = api_definitions.namespace AND api_deployments.definition_id = api_definitions.id AND api_deployments.definition_version = api_definitions.version - WHERE - api_deployments.site = $1 - "# - ) - .bind(site) + .bind(namespace) + .bind(definition_id) .fetch_all(self.db_pool.deref()) .await .map_err(|e| e.into()) } -} - -#[async_trait] -impl ApiDeploymentRepo for DbApiDeploymentRepo { - async fn create(&self, deployments: Vec) -> Result<(), RepoError> { - if !deployments.is_empty() { - let mut transaction = self.db_pool.begin().await?; - for deployment in deployments { - sqlx::query( - r#" - INSERT INTO api_deployments - (namespace, site, host, subdomain, definition_id, definition_version, created_at) - VALUES - ($1, $2, $3, $4, $5, $6, $7) - "#, - ) - .bind(deployment.namespace.clone()) - .bind(deployment.site.clone()) - .bind(deployment.host.clone()) - .bind(deployment.subdomain.clone()) - .bind(deployment.definition_id.clone()) - .bind(deployment.definition_version.clone()) - .bind(deployment.created_at) - .execute(&mut *transaction) - .await?; - } - transaction.commit().await?; - } - Ok(()) - } - async fn delete(&self, deployments: Vec) -> Result { - if !deployments.is_empty() { - let mut transaction = self.db_pool.begin().await?; - for deployment in deployments { - sqlx::query( - "DELETE FROM api_deployments WHERE namespace = $1 AND site = $2 AND definition_id = $3 AND definition_version = $4", - ) - .bind(deployment.namespace.clone()) - .bind(deployment.site.clone()) - .bind(deployment.definition_id.clone()) - .bind(deployment.definition_version.clone()) - .execute(&mut *transaction) - .await?; - } - transaction.commit().await?; - Ok(true) - } else { - Ok(false) - } - } - - async fn get_by_id( + #[when(sqlx::Postgres -> get_by_id_and_version)] + async fn get_by_id_and_version_postgres( &self, namespace: &str, definition_id: &str, + definition_version: &str, ) -> Result, RepoError> { sqlx::query_as::<_, ApiDeploymentRecord>( r#" SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at::timestamptz FROM api_deployments - WHERE namespace = $1 AND definition_id = $2 + WHERE namespace = $1 AND definition_id = $2 AND definition_version = $3 "#, ) .bind(namespace) .bind(definition_id) + .bind(definition_version) .fetch_all(self.db_pool.deref()) .await .map_err(|e| e.into()) } - async fn get_by_id_and_version( + #[when(sqlx::Sqlite -> get_by_id_and_version)] + async fn get_by_id_and_version_sqlite( &self, namespace: &str, definition_id: &str, @@ -290,20 +212,24 @@ impl ApiDeploymentRepo for DbApiDeploymentRepo { ) -> Result, RepoError> { sqlx::query_as::<_, ApiDeploymentRecord>( r#" - SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at::timestamptz + SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at FROM api_deployments WHERE namespace = $1 AND definition_id = $2 AND definition_version = $3 "#, ) - .bind(namespace) - .bind(definition_id) - .bind(definition_version) - .fetch_all(self.db_pool.deref()) - .await - .map_err(|e| e.into()) + .bind(namespace) + .bind(definition_id) + .bind(definition_version) + .fetch_all(self.db_pool.deref()) + .await + .map_err(|e| e.into()) } - async fn get_by_site(&self, site: &str) -> Result, RepoError> { + #[when(sqlx::Postgres -> get_by_site)] + async fn get_by_site_postgres( + &self, + site: &str, + ) -> Result, RepoError> { sqlx::query_as::<_, ApiDeploymentRecord>( r#" SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at::timestamptz @@ -318,7 +244,23 @@ impl ApiDeploymentRepo for DbApiDeploymentRepo { .map_err(|e| e.into()) } - async fn get_definitions_by_site( + #[when(sqlx::Sqlite -> get_by_site)] + async fn get_by_site_sqlite(&self, site: &str) -> Result, RepoError> { + sqlx::query_as::<_, ApiDeploymentRecord>( + r#" + SELECT namespace, site, host, subdomain, definition_id, definition_version, created_at + FROM api_deployments + WHERE site = $1 + "#, + ) + .bind(site) + .fetch_all(self.db_pool.deref()) + .await + .map_err(|e| e.into()) + } + + #[when(sqlx::Postgres -> get_definitions_by_site)] + async fn get_definitions_by_site_postgres( &self, site: &str, ) -> Result, RepoError> { @@ -336,4 +278,24 @@ impl ApiDeploymentRepo for DbApiDeploymentRepo { .await .map_err(|e| e.into()) } + + #[when(sqlx::Sqlite -> get_definitions_by_site)] + async fn get_definitions_by_site_sqlite( + &self, + site: &str, + ) -> Result, RepoError> { + sqlx::query_as::<_, ApiDefinitionRecord>( + r#" + SELECT api_definitions.namespace, api_definitions.id, api_definitions.version, api_definitions.draft, api_definitions.data, api_definitions.created_at + FROM api_deployments + JOIN api_definitions ON api_deployments.namespace = api_definitions.namespace AND api_deployments.definition_id = api_definitions.id AND api_deployments.definition_version = api_definitions.version + WHERE + api_deployments.site = $1 + "# + ) + .bind(site) + .fetch_all(self.db_pool.deref()) + .await + .map_err(|e| e.into()) + } } diff --git a/golem-worker-service-base/src/repo/mod.rs b/golem-worker-service-base/src/repo/mod.rs index edba60ef2..59b6a9c38 100644 --- a/golem-worker-service-base/src/repo/mod.rs +++ b/golem-worker-service-base/src/repo/mod.rs @@ -14,24 +14,3 @@ pub mod api_definition; pub mod api_deployment; - -use std::fmt::Display; - -#[derive(Debug)] -pub enum RepoError { - Internal(String), -} - -impl From for RepoError { - fn from(error: sqlx::Error) -> Self { - RepoError::Internal(error.to_string()) - } -} - -impl Display for RepoError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RepoError::Internal(error) => write!(f, "{}", error), - } - } -} diff --git a/golem-worker-service-base/src/service/api_definition.rs b/golem-worker-service-base/src/service/api_definition.rs index 568b84f81..82f824748 100644 --- a/golem-worker-service-base/src/service/api_definition.rs +++ b/golem-worker-service-base/src/service/api_definition.rs @@ -20,16 +20,16 @@ use crate::api_definition::http::{ CompiledHttpApiDefinition, ComponentMetadataDictionary, HttpApiDefinition, HttpApiDefinitionRequest, RouteCompilationErrors, }; -use async_trait::async_trait; -use chrono::Utc; -use golem_service_base::model::{Component, VersionedComponentId}; -use tracing::{error, info}; - use crate::api_definition::{ApiDefinitionId, ApiVersion, HasGolemWorkerBindings}; use crate::repo::api_definition::ApiDefinitionRecord; use crate::repo::api_definition::ApiDefinitionRepo; use crate::repo::api_deployment::ApiDeploymentRepo; -use crate::repo::RepoError; +use async_trait::async_trait; +use chrono::Utc; +use golem_common::SafeDisplay; +use golem_service_base::model::{Component, VersionedComponentId}; +use golem_service_base::repo::RepoError; +use tracing::{error, info}; use super::api_definition_validator::{ApiDefinitionValidatorService, ValidationErrors}; use super::component::ComponentService; @@ -60,23 +60,33 @@ pub enum ApiDefinitionError { ApiDefinitionAlreadyExists(ApiDefinitionId), #[error("API definition deployed: {0}")] ApiDefinitionDeployed(String), + #[error("Internal repository error: {0}")] + InternalRepoError(RepoError), #[error("Internal error: {0}")] - InternalError(#[from] anyhow::Error), + Internal(String), } -impl ApiDefinitionError { - fn internal(error: E, context: C) -> Self - where - E: Display + Debug + Send + Sync + 'static, - C: Display + Send + Sync + 'static, - { - ApiDefinitionError::InternalError(anyhow::Error::msg(error).context(context)) - } -} +impl ApiDefinitionError {} impl From for ApiDefinitionError { fn from(error: RepoError) -> Self { - ApiDefinitionError::internal(error, "Repository error") + ApiDefinitionError::InternalRepoError(error) + } +} + +impl SafeDisplay for ApiDefinitionError { + fn to_safe_string(&self) -> String { + match self { + ApiDefinitionError::ValidationError(inner) => inner.to_safe_string(), + ApiDefinitionError::ComponentNotFoundError(_) => self.to_string(), + ApiDefinitionError::RibCompilationErrors(_) => self.to_string(), + ApiDefinitionError::ApiDefinitionNotFound(_) => self.to_string(), + ApiDefinitionError::ApiDefinitionNotDraft(_) => self.to_string(), + ApiDefinitionError::ApiDefinitionAlreadyExists(_) => self.to_string(), + ApiDefinitionError::ApiDefinitionDeployed(_) => self.to_string(), + ApiDefinitionError::InternalRepoError(inner) => inner.to_safe_string(), + ApiDefinitionError::Internal(_) => self.to_string(), + } } } @@ -266,7 +276,9 @@ where compiled_http_api_definition.clone(), created_at, ) - .map_err(|e| ApiDefinitionError::internal(e, "Failed to create API definition record"))?; + .map_err(|e| { + ApiDefinitionError::Internal(format!("Failed to create API definition record: {e}")) + })?; self.definition_repo.create(&record).await?; @@ -319,7 +331,9 @@ where compiled_http_api_definition.clone(), created_at, ) - .map_err(|e| ApiDefinitionError::internal(e, "Failed to create API definition record"))?; + .map_err(|e| { + ApiDefinitionError::Internal(format!("Failed to create API definition record: {e}")) + })?; self.definition_repo.update(&record).await?; @@ -342,7 +356,9 @@ where match value { Some(v) => { let definition = v.try_into().map_err(|e| { - ApiDefinitionError::internal(e, "Failed to convert API definition record") + ApiDefinitionError::Internal(format!( + "Failed to convert API definition record: {e}" + )) })?; Ok(Some(definition)) } @@ -399,7 +415,9 @@ where .map(|d| d.clone().try_into()) .collect::, _>>() .map_err(|e| { - ApiDefinitionError::internal(e, "Failed to convert API definition record") + ApiDefinitionError::Internal(format!( + "Failed to convert API definition record: {e}" + )) })?; Ok(values) @@ -423,7 +441,9 @@ where .map(|d| d.clone().try_into()) .collect::, _>>() .map_err(|e| { - ApiDefinitionError::internal(e, "Failed to convert API definition record") + ApiDefinitionError::Internal(format!( + "Failed to convert API definition record: {e}" + )) })?; Ok(values) @@ -432,16 +452,17 @@ where #[cfg(test)] mod tests { - use crate::repo::RepoError; use crate::service::api_definition::ApiDefinitionError; + use golem_common::{SafeDisplay, SafeString}; + use golem_service_base::repo::RepoError; #[test] pub fn test_repo_error_to_service_error() { let repo_err = RepoError::Internal("some sql error".to_string()); - let service_err: ApiDefinitionError = repo_err.into(); + let service_err: ApiDefinitionError = repo_err.into(); assert_eq!( - service_err.to_string(), - "Internal error: Repository error".to_string() + service_err.to_safe_string(), + "Internal repository error".to_string() ); } } diff --git a/golem-worker-service-base/src/service/api_definition_validator.rs b/golem-worker-service-base/src/service/api_definition_validator.rs index 1148ea6f0..ad47c7694 100644 --- a/golem-worker-service-base/src/service/api_definition_validator.rs +++ b/golem-worker-service-base/src/service/api_definition_validator.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use serde::{Deserialize, Serialize}; - +use golem_common::SafeDisplay; use golem_service_base::model::Component; +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; // TODO; This is more specific to specific protocol validations -// There should be a separate validator for worker binding as it is a common to validation to all protocls +// There should be a separate validator for worker binding as it is a common to validation to all protocols pub trait ApiDefinitionValidatorService { fn validate( &self, @@ -27,8 +28,30 @@ pub trait ApiDefinitionValidatorService { } #[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, thiserror::Error)] -// TODO: Fix this display impl. -#[error("Validation error: {errors:?}")] pub struct ValidationErrors { pub errors: Vec, } + +impl Display for ValidationErrors { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Validation errors: {}", + self.errors + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", ") + ) + } +} + +impl SafeDisplay for ValidationErrors { + fn to_safe_string(&self) -> String { + self.errors + .iter() + .map(|e| e.to_safe_string()) + .collect::>() + .join(", ") + } +} diff --git a/golem-worker-service-base/src/service/api_deployment.rs b/golem-worker-service-base/src/service/api_deployment.rs index e8870f5b7..39e595e5a 100644 --- a/golem-worker-service-base/src/service/api_deployment.rs +++ b/golem-worker-service-base/src/service/api_deployment.rs @@ -31,9 +31,10 @@ use crate::http::router::{Router, RouterPattern}; use crate::repo::api_definition::ApiDefinitionRepo; use crate::repo::api_deployment::ApiDeploymentRecord; use crate::repo::api_deployment::ApiDeploymentRepo; -use crate::repo::RepoError; use crate::service::api_definition::ApiDefinitionIdWithVersion; use chrono::Utc; +use golem_common::SafeDisplay; +use golem_service_base::repo::RepoError; use std::fmt::{Debug, Display}; #[async_trait] @@ -49,7 +50,7 @@ pub trait ApiDeploymentService { ) -> Result<(), ApiDeploymentError>; // Example: A newer version of API definition is in dev site, and older version of the same definition-id is in prod site. - // Therefore Vec + // Therefore, Vec async fn get_by_id( &self, namespace: &Namespace, @@ -83,23 +84,37 @@ pub enum ApiDeploymentError { ApiDeploymentConflict(ApiSiteString), #[error("API deployment definitions conflict error: {0}")] ApiDefinitionsConflict(String), - #[error("Internal error: {0}")] - InternalError(#[from] anyhow::Error), + #[error("Internal repository error: {0}")] + InternalRepoError(RepoError), + #[error("Internal error: failed to convert {what}: {error}")] + InternalConversionError { what: String, error: String }, } impl ApiDeploymentError { - fn internal(error: E, context: C) -> Self - where - E: Display + Debug + Send + Sync + 'static, - C: Display + Send + Sync + 'static, - { - ApiDeploymentError::InternalError(anyhow::Error::msg(error).context(context)) + pub fn conversion_error(what: impl AsRef, error: String) -> Self { + Self::InternalConversionError { + what: what.as_ref().to_string(), + error, + } } } impl From for ApiDeploymentError { fn from(error: RepoError) -> Self { - ApiDeploymentError::internal(error, "Repository error") + ApiDeploymentError::InternalRepoError(error) + } +} + +impl SafeDisplay for ApiDeploymentError { + fn to_safe_string(&self) -> String { + match self { + ApiDeploymentError::ApiDefinitionNotFound(_, _) => self.to_string(), + ApiDeploymentError::ApiDeploymentNotFound(_, _) => self.to_string(), + ApiDeploymentError::ApiDeploymentConflict(_) => self.to_string(), + ApiDeploymentError::ApiDefinitionsConflict(_) => self.to_string(), + ApiDeploymentError::InternalRepoError(inner) => inner.to_safe_string(), + ApiDeploymentError::InternalConversionError { .. } => self.to_string(), + } } } @@ -262,10 +277,7 @@ where set_not_draft.push(api_definition_key.clone()); } let definition = record.try_into().map_err(|e| { - ApiDeploymentError::internal( - e, - "Failed to convert API definition record", - ) + ApiDeploymentError::conversion_error("API definition record", e) })?; definitions.push(definition); } @@ -405,9 +417,11 @@ where subdomain: deployment_record.subdomain, }; - let namespace: Namespace = deployment_record.namespace.try_into().map_err(|e| { - ApiDeploymentError::internal(e, "Failed to convert API deployment namespace") - })?; + let namespace: Namespace = deployment_record.namespace.try_into().map_err( + |e: >::Error| { + ApiDeploymentError::conversion_error("API deployment namespace", e.to_string()) + }, + )?; let api_definition_key = ApiDefinitionIdWithVersion { id: deployment_record.definition_id.into(), @@ -451,7 +465,7 @@ where let mut namespace: Option = None; - let mut created_at: Option> = None; + let mut created_at: Option> = None; for deployment_record in existing_deployment_records { if site.is_none() { @@ -462,9 +476,14 @@ where } if namespace.is_none() { - namespace = Some(deployment_record.namespace.try_into().map_err(|e| { - ApiDeploymentError::internal(e, "Failed to convert API deployment namespace") - })?); + namespace = Some(deployment_record.namespace.try_into().map_err( + |e: >::Error| { + ApiDeploymentError::conversion_error( + "API deployment namespace", + e.to_string(), + ) + }, + )?); } if created_at.is_none() || created_at.is_some_and(|t| t > deployment_record.created_at) @@ -502,9 +521,11 @@ where let mut values: Vec = vec![]; for record in records { - values.push(record.try_into().map_err(|e| { - ApiDeploymentError::internal(e, "Failed to convert API definition record") - })?); + values.push( + record.try_into().map_err(|e| { + ApiDeploymentError::conversion_error("API definition record", e) + })?, + ); } Ok(values) @@ -550,16 +571,17 @@ where #[cfg(test)] mod tests { - use crate::repo::RepoError; use crate::service::api_deployment::ApiDeploymentError; + use golem_common::SafeDisplay; + use golem_service_base::repo::RepoError; #[test] pub fn test_repo_error_to_service_error() { let repo_err = RepoError::Internal("some sql error".to_string()); let service_err: ApiDeploymentError = repo_err.into(); assert_eq!( - service_err.to_string(), - "Internal error: Repository error".to_string() + service_err.to_safe_string(), + "Internal repository error".to_string() ); } } diff --git a/golem-worker-service-base/src/service/component/default.rs b/golem-worker-service-base/src/service/component/default.rs index 4b81dfd69..f71a92bc1 100644 --- a/golem-worker-service-base/src/service/component/default.rs +++ b/golem-worker-service-base/src/service/component/default.rs @@ -5,7 +5,8 @@ use tonic::transport::Channel; use golem_api_grpc::proto::golem::component::v1::component_service_client::ComponentServiceClient; use golem_api_grpc::proto::golem::component::v1::{ - get_component_metadata_response, GetLatestComponentRequest, GetVersionedComponentRequest, + get_component_metadata_response, GetComponentMetadataResponse, GetLatestComponentRequest, + GetVersionedComponentRequest, }; use golem_common::client::{GrpcClient, GrpcClientConfig}; use golem_common::config::RetryConfig; @@ -59,6 +60,43 @@ impl RemoteComponentService { retry_config, } } + + fn process_metadata_response( + response: GetComponentMetadataResponse, + ) -> Result { + match response.result { + None => Err(ComponentServiceError::Internal( + "Empty response".to_string(), + )), + + Some(get_component_metadata_response::Result::Success(response)) => { + let component_view: Result = match response + .component + { + Some(component) => { + let component: Component = component.clone().try_into().map_err(|err| { + ComponentServiceError::Internal(format!( + "Response conversion error: {err}" + )) + })?; + Ok(component) + } + None => Err(ComponentServiceError::Internal( + "Empty component response".to_string(), + )), + }; + Ok(component_view?) + } + Some(get_component_metadata_response::Result::Error(error)) => Err(error.into()), + } + } + + fn is_retriable(error: &ComponentServiceError) -> bool { + matches!( + error, + ComponentServiceError::FailedGrpcStatus(_) | ComponentServiceError::FailedTransport(_) + ) + } } #[async_trait] @@ -66,14 +104,15 @@ impl ComponentService for RemoteComponentService where AuthCtx: IntoIterator + Clone + Send + Sync, { - async fn get_latest( + async fn get_by_version( &self, component_id: &ComponentId, + version: u64, metadata: &AuthCtx, ) -> ComponentResult { let value = with_retries( "component", - "get_latest", + "get_component", Some(component_id.to_string()), &self.retry_config, &(self.client.clone(), component_id.clone(), metadata.clone()), @@ -81,60 +120,36 @@ where Box::pin(async move { let response = client .call(move |client| { - let request = GetLatestComponentRequest { + let request = GetVersionedComponentRequest { component_id: Some(id.clone().into()), + version, }; + let request = with_metadata(request, metadata.clone()); - Box::pin(client.get_latest_component_metadata(request)) + Box::pin(client.get_component_metadata(request)) }) .await? .into_inner(); - match response.result { - None => Err(ComponentServiceError::internal("Empty response")), - Some(get_component_metadata_response::Result::Success(response)) => { - let component_view: Result< - golem_service_base::model::Component, - ComponentServiceError, - > = match response.component { - Some(component) => { - let component: golem_service_base::model::Component = - component.clone().try_into().map_err(|_| { - ComponentServiceError::internal( - "Response conversion error", - ) - })?; - Ok(component) - } - None => { - Err(ComponentServiceError::internal("Empty component response")) - } - }; - Ok(component_view?) - } - Some(get_component_metadata_response::Result::Error(error)) => { - Err(error.into()) - } - } + Self::process_metadata_response(response) }) }, - is_retriable, + Self::is_retriable, ) .await?; Ok(value) } - async fn get_by_version( + async fn get_latest( &self, component_id: &ComponentId, - version: u64, metadata: &AuthCtx, ) -> ComponentResult { let value = with_retries( "component", - "get_component", + "get_latest", Some(component_id.to_string()), &self.retry_config, &(self.client.clone(), component_id.clone(), metadata.clone()), @@ -142,58 +157,23 @@ where Box::pin(async move { let response = client .call(move |client| { - let request = GetVersionedComponentRequest { + let request = GetLatestComponentRequest { component_id: Some(id.clone().into()), - version, }; - let request = with_metadata(request, metadata.clone()); - Box::pin(client.get_component_metadata(request)) + Box::pin(client.get_latest_component_metadata(request)) }) .await? .into_inner(); - match response.result { - None => Err(ComponentServiceError::internal("Empty response")), - - Some(get_component_metadata_response::Result::Success(response)) => { - let component_view: Result< - golem_service_base::model::Component, - ComponentServiceError, - > = match response.component { - Some(component) => { - let component: golem_service_base::model::Component = - component.clone().try_into().map_err(|_| { - ComponentServiceError::internal( - "Response conversion error", - ) - })?; - Ok(component) - } - None => { - Err(ComponentServiceError::internal("Empty component response")) - } - }; - Ok(component_view?) - } - Some(get_component_metadata_response::Result::Error(error)) => { - Err(error.into()) - } - } + Self::process_metadata_response(response) }) }, - is_retriable, + Self::is_retriable, ) .await?; Ok(value) } } - -fn is_retriable(error: &ComponentServiceError) -> bool { - match error { - ComponentServiceError::Internal(error) => error.is::(), - _ => false, - } -} diff --git a/golem-worker-service-base/src/service/component/error.rs b/golem-worker-service-base/src/service/component/error.rs index db456b3a6..a6a01f604 100644 --- a/golem-worker-service-base/src/service/component/error.rs +++ b/golem-worker-service-base/src/service/component/error.rs @@ -4,6 +4,7 @@ use golem_api_grpc::proto::golem::worker::v1::{ worker_error, worker_execution_error, UnknownError, WorkerError as GrpcWorkerError, WorkerExecutionError, }; +use golem_common::SafeDisplay; // The dependents of golem-worker-service-base is expected // to have a component service internally that can depend on this base error @@ -19,28 +20,38 @@ pub enum ComponentServiceError { BadRequest(Vec), #[error("Already Exists: {0}")] AlreadyExists(String), - #[error(transparent)] - Internal(#[from] anyhow::Error), + #[error("Internal component service error: {0}")] + Internal(String), + #[error("Internal error: {0}")] + FailedGrpcStatus(Status), + #[error("Internal error: {0}")] + FailedTransport(tonic::transport::Error), } -impl ComponentServiceError { - pub fn internal(error: M) -> Self - where - M: std::fmt::Display + std::fmt::Debug + Send + Sync + 'static, - { - Self::Internal(anyhow::Error::msg(error)) +impl SafeDisplay for ComponentServiceError { + fn to_safe_string(&self) -> String { + match self { + ComponentServiceError::Unauthorized(_) => self.to_string(), + ComponentServiceError::Forbidden(_) => self.to_string(), + ComponentServiceError::NotFound(_) => self.to_string(), + ComponentServiceError::BadRequest(_) => self.to_string(), + ComponentServiceError::AlreadyExists(_) => self.to_string(), + ComponentServiceError::Internal(_) => self.to_string(), + ComponentServiceError::FailedGrpcStatus(_) => self.to_string(), + ComponentServiceError::FailedTransport(_) => self.to_string(), + } } } impl From for ComponentServiceError { fn from(status: Status) -> Self { - ComponentServiceError::Internal(status.into()) + ComponentServiceError::FailedGrpcStatus(status) } } impl From for ComponentServiceError { fn from(error: tonic::transport::Error) -> Self { - ComponentServiceError::Internal(error.into()) + ComponentServiceError::FailedTransport(error) } } @@ -53,10 +64,8 @@ impl From for Compo Some(Error::LimitExceeded(error)) => ComponentServiceError::Forbidden(error.error), Some(Error::NotFound(error)) => ComponentServiceError::NotFound(error.error), Some(Error::AlreadyExists(error)) => ComponentServiceError::AlreadyExists(error.error), - Some(Error::InternalError(error)) => { - ComponentServiceError::Internal(anyhow::Error::msg(error.error)) - } - None => ComponentServiceError::Internal(anyhow::Error::msg("Unknown error")), + Some(Error::InternalError(error)) => ComponentServiceError::Internal(error.error), + None => ComponentServiceError::Internal("Unknown error".to_string()), } } } @@ -96,6 +105,20 @@ impl From for worker_error::Error { })), }) } + ComponentServiceError::FailedGrpcStatus(status) => { + worker_error::Error::InternalError(WorkerExecutionError { + error: Some(worker_execution_error::Error::Unknown(UnknownError { + details: status.to_string(), + })), + }) + } + ComponentServiceError::FailedTransport(error) => { + worker_error::Error::InternalError(WorkerExecutionError { + error: Some(worker_execution_error::Error::Unknown(UnknownError { + details: error.to_string(), + })), + }) + } } } } diff --git a/golem-worker-service-base/src/service/http/http_api_definition_validator.rs b/golem-worker-service-base/src/service/http/http_api_definition_validator.rs index 0d9a7a3d0..3f569c1fe 100644 --- a/golem-worker-service-base/src/service/http/http_api_definition_validator.rs +++ b/golem-worker-service-base/src/service/http/http_api_definition_validator.rs @@ -1,8 +1,9 @@ use poem_openapi::Object; +use std::fmt::{Display, Formatter}; -use serde::{Deserialize, Serialize}; - +use golem_common::SafeDisplay; use golem_service_base::model::{Component, VersionedComponentId}; +use serde::{Deserialize, Serialize}; use crate::api_definition::http::{HttpApiDefinition, MethodPattern, Route}; @@ -29,6 +30,22 @@ impl RouteValidationError { } } +impl Display for RouteValidationError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RouteValidationError: method: {}, path: {}, component: {}, detail: {}", + self.method, self.path, self.component, self.detail + ) + } +} + +impl SafeDisplay for RouteValidationError { + fn to_safe_string(&self) -> String { + self.to_string() + } +} + #[derive(Clone)] pub struct HttpApiDefinitionValidator {} diff --git a/golem-worker-service-base/src/service/worker/default.rs b/golem-worker-service-base/src/service/worker/default.rs index f7608e379..866aa1e9b 100644 --- a/golem-worker-service-base/src/service/worker/default.rs +++ b/golem-worker-service-base/src/service/worker/default.rs @@ -330,7 +330,7 @@ where } => Err(err.into()), workerexecutor::v1::CreateWorkerResponse { .. } => Err("Empty response".into()), }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; @@ -364,7 +364,7 @@ where { WorkerServiceError::WorkerNotFound(worker_id_err.clone()) } - _ => WorkerServiceError::internal(error), + _ => WorkerServiceError::InternalCallError(error), }, ) .await?; @@ -402,7 +402,7 @@ where } => Err(err.into()), workerexecutor::v1::DeleteWorkerResponse { .. } => Err("Empty response".into()), }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; @@ -478,7 +478,7 @@ where } } }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ).await?; Ok(invoke_response) @@ -538,7 +538,7 @@ where } } }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ).await?; Ok(invoke_response) @@ -583,7 +583,7 @@ where } workerexecutor::v1::InvokeWorkerResponse { .. } => Err("Empty response".into()), }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; Ok(()) @@ -637,7 +637,7 @@ where } } }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; Ok(result) @@ -674,7 +674,7 @@ where } => Err(err.into()), workerexecutor::v1::InterruptWorkerResponse { .. } => Err("Empty response".into()), }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; @@ -720,7 +720,7 @@ where } } }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ).await?; Ok(metadata) @@ -782,7 +782,7 @@ where } => Err(err.into()), workerexecutor::v1::ResumeWorkerResponse { .. } => Err("Empty response".into()), }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; Ok(()) @@ -818,7 +818,7 @@ where } => Err(err.into()), workerexecutor::v1::UpdateWorkerResponse { .. } => Err("Empty response".into()), }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; Ok(()) @@ -889,7 +889,7 @@ where } => Err(err.into()), workerexecutor::v1::GetOplogResponse { .. } => Err("Empty response".into()), }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await } @@ -976,7 +976,7 @@ where } }).collect::, ResponseMapResult>>() }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ).await?; Ok(result.into_iter().flatten().collect()) @@ -1044,7 +1044,7 @@ where Err("Empty response".into()) } }, - WorkerServiceError::internal, + WorkerServiceError::InternalCallError, ) .await?; diff --git a/golem-worker-service-base/src/service/worker/error.rs b/golem-worker-service-base/src/service/worker/error.rs index 8e1b7825e..2b717a4c6 100644 --- a/golem-worker-service-base/src/service/worker/error.rs +++ b/golem-worker-service-base/src/service/worker/error.rs @@ -16,15 +16,16 @@ use golem_api_grpc::proto::golem::worker::v1::{ worker_error, worker_execution_error, UnknownError, WorkerError as GrpcWorkerError, }; use golem_common::model::{AccountId, ComponentId, WorkerId}; +use golem_common::SafeDisplay; use golem_service_base::model::{GolemError, VersionedComponentId}; use crate::service::component::ComponentServiceError; +use crate::service::worker::CallWorkerExecutorError; #[derive(Debug, thiserror::Error)] pub enum WorkerServiceError { #[error(transparent)] Component(#[from] ComponentServiceError), - // TODO: This should prob be a vec? #[error("Type checker error: {0}")] TypeChecker(String), #[error("Component not found: {0}")] @@ -36,17 +37,26 @@ pub enum WorkerServiceError { #[error("Worker not found: {0}")] WorkerNotFound(WorkerId), #[error("Internal error: {0}")] - Internal(#[from] anyhow::Error), + Internal(String), #[error(transparent)] Golem(GolemError), + #[error(transparent)] + InternalCallError(CallWorkerExecutorError), } -impl WorkerServiceError { - pub fn internal(error: M) -> Self - where - M: std::error::Error + Send + Sync + 'static, - { - Self::Internal(anyhow::Error::new(error)) +impl SafeDisplay for WorkerServiceError { + fn to_safe_string(&self) -> String { + match self { + WorkerServiceError::Component(inner) => inner.to_safe_string(), + WorkerServiceError::TypeChecker(_) => self.to_string(), + WorkerServiceError::VersionedComponentIdNotFound(_) => self.to_string(), + WorkerServiceError::ComponentNotFound(_) => self.to_string(), + WorkerServiceError::AccountIdNotFound(_) => self.to_string(), + WorkerServiceError::WorkerNotFound(_) => self.to_string(), + WorkerServiceError::Internal(_) => self.to_string(), + WorkerServiceError::Golem(inner) => inner.to_safe_string(), + WorkerServiceError::InternalCallError(inner) => inner.to_safe_string(), + } } } @@ -68,12 +78,19 @@ impl From for worker_error::Error { | WorkerServiceError::AccountIdNotFound(_) | WorkerServiceError::VersionedComponentIdNotFound(_) | WorkerServiceError::WorkerNotFound(_)) => worker_error::Error::NotFound(ErrorBody { - error: error.to_string(), + error: error.to_safe_string(), }), WorkerServiceError::Internal(_) => { worker_error::Error::InternalError(WorkerExecutionError { error: Some(worker_execution_error::Error::Unknown(UnknownError { - details: error.to_string(), + details: error.to_safe_string(), + })), + }) + } + WorkerServiceError::InternalCallError(_) => { + worker_error::Error::InternalError(WorkerExecutionError { + error: Some(worker_execution_error::Error::Unknown(UnknownError { + details: error.to_safe_string(), })), }) } diff --git a/golem-worker-service-base/src/service/worker/routing_logic.rs b/golem-worker-service-base/src/service/worker/routing_logic.rs index 5fd8c1abe..e5b1b030f 100644 --- a/golem-worker-service-base/src/service/worker/routing_logic.rs +++ b/golem-worker-service-base/src/service/worker/routing_logic.rs @@ -17,7 +17,6 @@ use std::fmt::Debug; use std::future::Future; use std::pin::Pin; -use anyhow::anyhow; use async_trait::async_trait; use tokio::task::JoinSet; use tokio::time::{sleep, Instant}; @@ -32,6 +31,7 @@ use golem_common::config::RetryConfig; use golem_common::model::{Pod, ShardId, TargetWorkerId, WorkerId}; use golem_common::retriable_error::IsRetriableError; use golem_common::retries::get_delay; +use golem_common::SafeDisplay; use golem_service_base::model::{GolemError, GolemErrorInvalidShardId, GolemErrorUnknown}; use golem_service_base::routing_table::{HasRoutingTableService, RoutingTableError}; @@ -322,7 +322,7 @@ impl From for ResponseMapResult { impl From<&'static str> for ResponseMapResult { fn from(error: &'static str) -> Self { - ResponseMapResult::Other(WorkerServiceError::Internal(anyhow!(error))) + ResponseMapResult::Other(WorkerServiceError::Internal(error.to_string())) } } @@ -429,6 +429,15 @@ pub enum CallWorkerExecutorError { FailedToConnectToPod(Status), } +impl SafeDisplay for CallWorkerExecutorError { + fn to_safe_string(&self) -> String { + match self { + CallWorkerExecutorError::FailedToGetRoutingTable(_) => self.to_string(), + CallWorkerExecutorError::FailedToConnectToPod(_) => self.to_string(), + } + } +} + pub struct CallWorkerExecutorErrorWithContext { error: CallWorkerExecutorError, pod: Option, diff --git a/golem-worker-service/src/api/api_definition.rs b/golem-worker-service/src/api/api_definition.rs index 32974bfdf..4b2e1f2ff 100644 --- a/golem-worker-service/src/api/api_definition.rs +++ b/golem-worker-service/src/api/api_definition.rs @@ -1,7 +1,7 @@ use std::result::Result; use std::sync::Arc; -use golem_common::recorded_http_api_request; +use golem_common::{recorded_http_api_request, safe}; use golem_service_base::api_tags::ApiTags; use golem_service_base::auth::{DefaultNamespace, EmptyAuthCtx}; use golem_worker_service_base::api::ApiEndpointError; @@ -53,7 +53,7 @@ impl RegisterApiDefinitionApi { let response = { let definition = get_api_definition(openapi.0).map_err(|e| { error!("Invalid Spec {}", e); - ApiEndpointError::bad_request(e) + ApiEndpointError::bad_request(safe(e)) })?; let result = self @@ -87,7 +87,7 @@ impl RegisterApiDefinitionApi { let definition: CoreHttpApiDefinitionRequest = payload .0 .try_into() - .map_err(ApiEndpointError::bad_request)?; + .map_err(|err| ApiEndpointError::bad_request(safe(err)))?; let result = self .create_api(&definition) @@ -125,14 +125,16 @@ impl RegisterApiDefinitionApi { let definition: CoreHttpApiDefinitionRequest = payload .0 .try_into() - .map_err(ApiEndpointError::bad_request)?; + .map_err(|err| ApiEndpointError::bad_request(safe(err)))?; if id.0 != definition.id { - Err(ApiEndpointError::bad_request("Unmatched url and body ids.")) + Err(ApiEndpointError::bad_request(safe( + "Unmatched url and body ids.".to_string(), + ))) } else if version.0 != definition.version { - Err(ApiEndpointError::bad_request( - "Unmatched url and body versions.", - )) + Err(ApiEndpointError::bad_request(safe( + "Unmatched url and body versions.".to_string(), + ))) } else { let result = self .definition_service @@ -186,9 +188,9 @@ impl RegisterApiDefinitionApi { .instrument(record.span.clone()) .await?; - let definition = data.ok_or(ApiEndpointError::not_found(format!( + let definition = data.ok_or(ApiEndpointError::not_found(safe(format!( "Can't find api definition with id {api_definition_id}, and version {api_version}" - )))?; + ))))?; let result = HttpApiDefinitionWithTypeInfo::from(definition); Ok(Json(result)) diff --git a/golem-worker-service/src/api/api_deployment.rs b/golem-worker-service/src/api/api_deployment.rs index e73b36da8..2cc887276 100644 --- a/golem-worker-service/src/api/api_deployment.rs +++ b/golem-worker-service/src/api/api_deployment.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use golem_common::recorded_http_api_request; +use golem_common::{recorded_http_api_request, safe}; use golem_service_base::api_tags::ApiTags; use golem_service_base::auth::DefaultNamespace; use golem_worker_service_base::api::ApiEndpointError; @@ -62,9 +62,9 @@ impl ApiDeploymentApi { .instrument(record.span.clone()) .await?; - let deployment = data.ok_or(ApiEndpointError::internal( - "Failed to verify the deployment", - ))?; + let deployment = data.ok_or(ApiEndpointError::internal(safe( + "Failed to verify the deployment".to_string(), + )))?; Ok(Json(deployment.into())) }; @@ -112,7 +112,9 @@ impl ApiDeploymentApi { .deployment_service .get_by_site(&ApiSiteString(site)) .await? - .ok_or(ApiEndpointError::not_found("Api deployment not found"))?; + .ok_or(ApiEndpointError::not_found(safe( + "Api deployment not found".to_string(), + )))?; Ok(Json(value.into())) }; From 2461028326c36281e7c78d30d8b8faa048be1417 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 10 Oct 2024 17:57:32 +0200 Subject: [PATCH 2/4] Publish fix (#1000) --- golem-client/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/golem-client/Cargo.toml b/golem-client/Cargo.toml index b0b221ee2..8811557a0 100644 --- a/golem-client/Cargo.toml +++ b/golem-client/Cargo.toml @@ -12,7 +12,7 @@ include = ["src/**/*", "Cargo.toml", "build.rs", "openapi/**/*"] [lib] [dependencies] -golem-common = { path = "../golem-common" } +golem-common = { path = "../golem-common", version = "0.0.0" } golem-wasm-ast = { workspace = true } golem-wasm-rpc = { workspace = true } From 88624de6677b3e20a33ed57a6f0f3f1ea0cdc7c0 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Fri, 11 Oct 2024 09:10:39 +0200 Subject: [PATCH 3/4] Fix publish order (#1002) --- Makefile.toml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index a27cda768..573094678 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -55,7 +55,9 @@ command = "cargo" args = ["build", "--workspace", "--all-targets"] [tasks.build-bins-non-ci] -condition = { env_not_set = ["CI"] } # on CI we always 'cargo make build' first so no need to recompile bins +condition = { env_not_set = [ + "CI", +] } # on CI we always 'cargo make build' first so no need to recompile bins run_task = "build-bins" [tasks.build-bins] @@ -106,7 +108,7 @@ command = "cargo" args = [ "build", "-p", - "golem-shard-manager", # NOTE: Not all projects are cross-compilable because of an openssl dependency + "golem-shard-manager", # NOTE: Not all projects are cross-compilable because of an openssl dependency "-p", "golem-worker-executor", "-p", @@ -322,7 +324,7 @@ args = [ "integration", "--", "--nocapture", - "--test-threads=1" + "--test-threads=1", ] ## ** CHECK-OPENAPI ** @@ -405,10 +407,10 @@ args = ["-v", "./target/golem-service.yaml", "./openapi/golem-service.yaml"] description = "Publishes packages to crates.io" dependencies = [ "build-release", - "publish-golem-client", "publish-golem-api-grpc", - "publish-golem-rib", "publish-golem-common", + "publish-golem-client", + "publish-golem-rib", "publish-golem-service-base", "publish-golem-test-framework", "publish-golem-cli", @@ -658,9 +660,7 @@ export RUST_BACKTRACE=1 [tasks.check-configs] description = "Generates configs from code and checks if it's committed" -dependencies = [ - "generate-configs" -] +dependencies = ["generate-configs"] script = ''' git diff --exit-code \ @@ -694,4 +694,4 @@ docker compose --project-directory log-tools/elastic stop description = "Stops and removes the elastic environment, including all data" script = ''' docker compose --project-directory log-tools/elastic down --volumes -''' \ No newline at end of file +''' From e998c5f991be6cfcde7c3eb5a527b18743654082 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Fri, 11 Oct 2024 10:17:45 +0200 Subject: [PATCH 4/4] Fix publish order (#1003) --- Makefile.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile.toml b/Makefile.toml index 573094678..a980c14cb 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -408,9 +408,9 @@ description = "Publishes packages to crates.io" dependencies = [ "build-release", "publish-golem-api-grpc", + "publish-golem-rib", "publish-golem-common", "publish-golem-client", - "publish-golem-rib", "publish-golem-service-base", "publish-golem-test-framework", "publish-golem-cli",