Skip to content

Commit

Permalink
adds a new version to the /vaults endpoint to list cache duration
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Calza <[email protected]>
  • Loading branch information
brunocalza committed Mar 21, 2024
1 parent 88693c9 commit f00fd21
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 20 deletions.
4 changes: 2 additions & 2 deletions lib/worker/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod publications;

pub use publications::{
create_job, delete_expired_job, find_job_cache_path_by_cid, get_cache_config,
is_namespace_owner, namespace_create, namespace_exists, pub_cids,
create_job, delete_expired_job, find_cache_config_by_vaults, find_job_cache_path_by_cid,
get_cache_config, is_namespace_owner, namespace_create, namespace_exists, pub_cids,
};
31 changes: 27 additions & 4 deletions lib/worker/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,40 @@ pub async fn is_namespace_owner(pool: &PgPool, ns: &str, owner: Address) -> Resu
Ok(!res.is_empty())
}

/// Returns cache config
pub async fn get_cache_config(pool: &PgPool, ns: &str, rel: &str) -> Result<Option<i64>> {
/// Returns cache config from a vault
pub async fn get_cache_config(pool: &PgPool, vault: &Vault) -> Result<Option<i64>> {
let (duration, ) : (Option<i64>, ) = sqlx::query_as("SELECT duration FROM cache_config JOIN namespaces ON ns_id = namespaces.id WHERE name = $1 AND relation = $2")
.bind(ns)
.bind(rel)
.bind(vault.namespace())
.bind(vault.relation())
.fetch_one(pool)
.await
.unwrap_or((None, ));
Ok(duration)
}

pub async fn find_cache_config_by_vaults(
pool: &PgPool,
vaults: Vec<Vault>,
) -> Result<Vec<(String, Option<i64>)>> {
let where_clause = (1..2 * vaults.len() + 1)
.step_by(2)
.map(|i| format!("(name = ${} AND relation = ${})", i, i + 1))
.collect::<Vec<String>>()
.join(" OR ");

let sql = format!("SELECT name || '.' || relation as vault, duration FROM namespaces JOIN cache_config ON ns_id = namespaces.id WHERE {}", where_clause);
let mut query = sqlx::query(sql.as_str());
for vault in vaults {
query = query.bind(vault.namespace()).bind(vault.relation());
}

query
.map(|row: PgRow| (row.get("vault"), row.get("duration")))
.fetch_all(pool)
.await
.map_err(basin_common::errors::Error::from)
}

// Unsets cache_path and expires_at
pub async fn delete_expired_job(pool: &PgPool) -> Result<()> {
sqlx::query!("UPDATE jobs SET expires_at = null, cache_path = null WHERE now() at time zone 'utc' >= expires_at;")
Expand Down
2 changes: 1 addition & 1 deletion lib/worker/src/domain/vault.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use regex::Regex;
use std::fmt;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Vault {
ns: String,
rel: String,
Expand Down
102 changes: 89 additions & 13 deletions lib/worker/src/routes/vaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::domain::Vault;
use crate::gcs::GcsClient;
use crate::web3storage::Web3Storage;
use hex::ToHex;
use std::collections::HashMap;
use std::str::FromStr;

use basin_evm::EVMClient;
Expand Down Expand Up @@ -150,6 +151,82 @@ pub async fn find_vaults_by_account<E: EVMClient + 'static + std::marker::Sync>(
Ok(with_status(json(&vaults), StatusCode::OK))
}

pub async fn find_vaults_by_account_v2<E: EVMClient + 'static + std::marker::Sync>(
evm_client: E,
pool: PgPool,
params: FindVaultsByAccountParams,
) -> Result<impl warp::Reply, Infallible> {
let account = match Address::from_str(params.account.as_str()) {
Ok(v) => v,
Err(_) => {
return Ok(with_status(
json(&ErrorResponse {
error: "account is invalid".to_string(),
}),
StatusCode::BAD_REQUEST,
));
}
};

let vaults = match evm_client.list_pub(account).await {
Ok(vaults) => vaults
.into_iter()
.map(|s| Vault::from(s).unwrap())
.collect::<Vec<Vault>>(),
Err(err) => {
return Ok(with_status(
json(&ErrorResponse {
error: err.to_string(),
}),
StatusCode::BAD_REQUEST,
));
}
};

#[derive(Serialize)]
struct ResponseItem {
vault: String,
cache_duration: Option<i64>,
}

let mut response: Vec<ResponseItem> = Vec::new();
let rows = match db::find_cache_config_by_vaults(&pool, vaults.clone()).await {
Ok(v) => v,
Err(err) => {
log::error!("{}", err);
return Ok(with_status(
json(&ErrorResponse {
error: "error fetching the cache config".to_string(),
}),
StatusCode::INTERNAL_SERVER_ERROR,
));
}
};

let mut vault_duration_map: HashMap<String, Option<i64>> = HashMap::new();
for row in rows {
vault_duration_map.insert(row.0, row.1);
}

for vault in vaults {
let vault_name = vault.to_string();
let item = match vault_duration_map.get(&vault_name) {
Some(&duration) => ResponseItem {
vault: vault_name,
cache_duration: duration,
},
None => ResponseItem {
vault: vault_name,
cache_duration: None,
},
};

response.push(item);
}

Ok(with_status(json(&response), StatusCode::OK))
}

#[derive(Debug, Deserialize)]
pub struct FindVaultsByAccountParams {
account: String,
Expand Down Expand Up @@ -408,19 +485,18 @@ pub async fn write_event<W: Web3Storage>(
}
}

let cache_duration =
match db::get_cache_config(&pool, &vault.namespace(), &vault.relation()).await {
Ok(v) => v,
Err(err) => {
log::error!("{}", err);
return Ok(with_status(
json(&ErrorResponse {
error: "error fetching the cache config".to_string(),
}),
StatusCode::INTERNAL_SERVER_ERROR,
));
}
};
let cache_duration = match db::get_cache_config(&pool, &vault).await {
Ok(v) => v,
Err(err) => {
log::error!("{}", err);
return Ok(with_status(
json(&ErrorResponse {
error: "error fetching the cache config".to_string(),
}),
StatusCode::INTERNAL_SERVER_ERROR,
));
}
};

let filename = format!(
"{}/{}/{}-{}",
Expand Down
14 changes: 14 additions & 0 deletions lib/worker/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod api {
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
health()
.or(vaults_list(evm_client.clone()))
.or(vaults_list_v2(evm_client.clone(), db.clone()))
.or(vaults_create(evm_client, db.clone()))
.or(vaults_events_create(
db.clone(),
Expand Down Expand Up @@ -67,6 +68,19 @@ mod api {
.and_then(routes::find_vaults_by_account)
}

// GET /v2/vaults
pub fn vaults_list_v2<E: EVMClient + 'static + std::marker::Sync>(
evm_client: E,
db: PgPool,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v2" / "vaults")
.and(warp::get())
.and(with_evm_client(evm_client))
.and(with_db(db))
.and(warp::query::<FindVaultsByAccountParams>())
.and_then(routes::find_vaults_by_account_v2)
}

// POST /vaults/:id
pub fn vaults_create<E: EVMClient + 'static + std::marker::Sync>(
evm_client: E,
Expand Down
12 changes: 12 additions & 0 deletions lib/worker/tests/api/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ impl TestApp {
.expect("Failed to execute request.")
}

pub async fn get_vaults_v2(&self) -> Response {
self.api_client
.get(&format!(
"{}/v2/vaults?account={:#x}",
&self.address,
self.account.address()
))
.send()
.await
.expect("Failed to execute request.")
}

pub async fn get_events_from_vaults(&self, vault: &str) -> Response {
self.api_client
.get(&format!("{}/vaults/{}/events", &self.address, vault))
Expand Down
31 changes: 31 additions & 0 deletions lib/worker/tests/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,37 @@ async fn list_vaults() {
assert_eq!(json!(["api.test", "api.test2"]), response);
}

#[tokio::test]
async fn list_vaults_v2() {
let app = spawn_app().await;

// setup
app.create_vault("api.test").await;
app.create_vault_with_cache("api.test2", 100).await;

// make request
let response = app
.get_vaults_v2()
.await
.text()
.await
.unwrap()
.parse::<serde_json::Value>()
.unwrap();
assert_eq!(
json!([
{
"vault": "api.test",
"cache_duration": null,
},
{
"vault": "api.test2",
"cache_duration": 100,
}]),
response
);
}

#[tokio::test]
async fn list_events() {
let app = spawn_app().await;
Expand Down

0 comments on commit f00fd21

Please sign in to comment.