From f00fd21c1321134eeeac32fb06c2f34fd1b5c15e Mon Sep 17 00:00:00 2001 From: Bruno Calza Date: Thu, 21 Mar 2024 16:10:33 -0300 Subject: [PATCH] adds a new version to the /vaults endpoint to list cache duration Signed-off-by: Bruno Calza --- lib/worker/src/db.rs | 4 +- lib/worker/src/db/publications.rs | 31 +++++++-- lib/worker/src/domain/vault.rs | 2 +- lib/worker/src/routes/vaults.rs | 102 ++++++++++++++++++++++++++---- lib/worker/src/startup.rs | 14 ++++ lib/worker/tests/api/helpers.rs | 12 ++++ lib/worker/tests/api/http.rs | 31 +++++++++ 7 files changed, 176 insertions(+), 20 deletions(-) diff --git a/lib/worker/src/db.rs b/lib/worker/src/db.rs index a216540..4d61344 100644 --- a/lib/worker/src/db.rs +++ b/lib/worker/src/db.rs @@ -1,6 +1,6 @@ mod publications; pub use publications::{ - create_job, delete_expired_job, find_job_cache_path_by_cid, get_cache_config, - is_namespace_owner, namespace_create, namespace_exists, pub_cids, + create_job, delete_expired_job, find_cache_config_by_vaults, find_job_cache_path_by_cid, + get_cache_config, is_namespace_owner, namespace_create, namespace_exists, pub_cids, }; diff --git a/lib/worker/src/db/publications.rs b/lib/worker/src/db/publications.rs index 962052f..5b389cb 100644 --- a/lib/worker/src/db/publications.rs +++ b/lib/worker/src/db/publications.rs @@ -70,17 +70,40 @@ pub async fn is_namespace_owner(pool: &PgPool, ns: &str, owner: Address) -> Resu Ok(!res.is_empty()) } -/// Returns cache config -pub async fn get_cache_config(pool: &PgPool, ns: &str, rel: &str) -> Result> { +/// Returns cache config from a vault +pub async fn get_cache_config(pool: &PgPool, vault: &Vault) -> Result> { let (duration, ) : (Option, ) = sqlx::query_as("SELECT duration FROM cache_config JOIN namespaces ON ns_id = namespaces.id WHERE name = $1 AND relation = $2") - .bind(ns) - .bind(rel) + .bind(vault.namespace()) + .bind(vault.relation()) .fetch_one(pool) .await .unwrap_or((None, )); Ok(duration) } +pub async fn find_cache_config_by_vaults( + pool: &PgPool, + vaults: Vec, +) -> Result)>> { + let where_clause = (1..2 * vaults.len() + 1) + .step_by(2) + .map(|i| format!("(name = ${} AND relation = ${})", i, i + 1)) + .collect::>() + .join(" OR "); + + let sql = format!("SELECT name || '.' || relation as vault, duration FROM namespaces JOIN cache_config ON ns_id = namespaces.id WHERE {}", where_clause); + let mut query = sqlx::query(sql.as_str()); + for vault in vaults { + query = query.bind(vault.namespace()).bind(vault.relation()); + } + + query + .map(|row: PgRow| (row.get("vault"), row.get("duration"))) + .fetch_all(pool) + .await + .map_err(basin_common::errors::Error::from) +} + // Unsets cache_path and expires_at pub async fn delete_expired_job(pool: &PgPool) -> Result<()> { sqlx::query!("UPDATE jobs SET expires_at = null, cache_path = null WHERE now() at time zone 'utc' >= expires_at;") diff --git a/lib/worker/src/domain/vault.rs b/lib/worker/src/domain/vault.rs index fddc2d5..3ca1160 100644 --- a/lib/worker/src/domain/vault.rs +++ b/lib/worker/src/domain/vault.rs @@ -1,7 +1,7 @@ use regex::Regex; use std::fmt; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Vault { ns: String, rel: String, diff --git a/lib/worker/src/routes/vaults.rs b/lib/worker/src/routes/vaults.rs index ad9e304..868c3fd 100644 --- a/lib/worker/src/routes/vaults.rs +++ b/lib/worker/src/routes/vaults.rs @@ -5,6 +5,7 @@ use crate::domain::Vault; use crate::gcs::GcsClient; use crate::web3storage::Web3Storage; use hex::ToHex; +use std::collections::HashMap; use std::str::FromStr; use basin_evm::EVMClient; @@ -150,6 +151,82 @@ pub async fn find_vaults_by_account( Ok(with_status(json(&vaults), StatusCode::OK)) } +pub async fn find_vaults_by_account_v2( + evm_client: E, + pool: PgPool, + params: FindVaultsByAccountParams, +) -> Result { + let account = match Address::from_str(params.account.as_str()) { + Ok(v) => v, + Err(_) => { + return Ok(with_status( + json(&ErrorResponse { + error: "account is invalid".to_string(), + }), + StatusCode::BAD_REQUEST, + )); + } + }; + + let vaults = match evm_client.list_pub(account).await { + Ok(vaults) => vaults + .into_iter() + .map(|s| Vault::from(s).unwrap()) + .collect::>(), + Err(err) => { + return Ok(with_status( + json(&ErrorResponse { + error: err.to_string(), + }), + StatusCode::BAD_REQUEST, + )); + } + }; + + #[derive(Serialize)] + struct ResponseItem { + vault: String, + cache_duration: Option, + } + + let mut response: Vec = Vec::new(); + let rows = match db::find_cache_config_by_vaults(&pool, vaults.clone()).await { + Ok(v) => v, + Err(err) => { + log::error!("{}", err); + return Ok(with_status( + json(&ErrorResponse { + error: "error fetching the cache config".to_string(), + }), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + }; + + let mut vault_duration_map: HashMap> = HashMap::new(); + for row in rows { + vault_duration_map.insert(row.0, row.1); + } + + for vault in vaults { + let vault_name = vault.to_string(); + let item = match vault_duration_map.get(&vault_name) { + Some(&duration) => ResponseItem { + vault: vault_name, + cache_duration: duration, + }, + None => ResponseItem { + vault: vault_name, + cache_duration: None, + }, + }; + + response.push(item); + } + + Ok(with_status(json(&response), StatusCode::OK)) +} + #[derive(Debug, Deserialize)] pub struct FindVaultsByAccountParams { account: String, @@ -408,19 +485,18 @@ pub async fn write_event( } } - let cache_duration = - match db::get_cache_config(&pool, &vault.namespace(), &vault.relation()).await { - Ok(v) => v, - Err(err) => { - log::error!("{}", err); - return Ok(with_status( - json(&ErrorResponse { - error: "error fetching the cache config".to_string(), - }), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - }; + let cache_duration = match db::get_cache_config(&pool, &vault).await { + Ok(v) => v, + Err(err) => { + log::error!("{}", err); + return Ok(with_status( + json(&ErrorResponse { + error: "error fetching the cache config".to_string(), + }), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + }; let filename = format!( "{}/{}/{}-{}", diff --git a/lib/worker/src/startup.rs b/lib/worker/src/startup.rs index 52aca05..70de4cd 100644 --- a/lib/worker/src/startup.rs +++ b/lib/worker/src/startup.rs @@ -39,6 +39,7 @@ mod api { ) -> impl Filter + Clone { health() .or(vaults_list(evm_client.clone())) + .or(vaults_list_v2(evm_client.clone(), db.clone())) .or(vaults_create(evm_client, db.clone())) .or(vaults_events_create( db.clone(), @@ -67,6 +68,19 @@ mod api { .and_then(routes::find_vaults_by_account) } + // GET /v2/vaults + pub fn vaults_list_v2( + evm_client: E, + db: PgPool, + ) -> impl Filter + Clone { + warp::path!("v2" / "vaults") + .and(warp::get()) + .and(with_evm_client(evm_client)) + .and(with_db(db)) + .and(warp::query::()) + .and_then(routes::find_vaults_by_account_v2) + } + // POST /vaults/:id pub fn vaults_create( evm_client: E, diff --git a/lib/worker/tests/api/helpers.rs b/lib/worker/tests/api/helpers.rs index 6f0e11e..9620816 100644 --- a/lib/worker/tests/api/helpers.rs +++ b/lib/worker/tests/api/helpers.rs @@ -145,6 +145,18 @@ impl TestApp { .expect("Failed to execute request.") } + pub async fn get_vaults_v2(&self) -> Response { + self.api_client + .get(&format!( + "{}/v2/vaults?account={:#x}", + &self.address, + self.account.address() + )) + .send() + .await + .expect("Failed to execute request.") + } + pub async fn get_events_from_vaults(&self, vault: &str) -> Response { self.api_client .get(&format!("{}/vaults/{}/events", &self.address, vault)) diff --git a/lib/worker/tests/api/http.rs b/lib/worker/tests/api/http.rs index 03e7e26..e5a5b42 100644 --- a/lib/worker/tests/api/http.rs +++ b/lib/worker/tests/api/http.rs @@ -37,6 +37,37 @@ async fn list_vaults() { assert_eq!(json!(["api.test", "api.test2"]), response); } +#[tokio::test] +async fn list_vaults_v2() { + let app = spawn_app().await; + + // setup + app.create_vault("api.test").await; + app.create_vault_with_cache("api.test2", 100).await; + + // make request + let response = app + .get_vaults_v2() + .await + .text() + .await + .unwrap() + .parse::() + .unwrap(); + assert_eq!( + json!([ + { + "vault": "api.test", + "cache_duration": null, + }, + { + "vault": "api.test2", + "cache_duration": 100, + }]), + response + ); +} + #[tokio::test] async fn list_events() { let app = spawn_app().await;