From bd9b4f70250721096b8a88dc60ae047868bacf2c Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Wed, 22 Nov 2023 17:00:36 +0100 Subject: [PATCH 1/4] core database: small nits and refactoring --- core/bin/dust_api.rs | 7 +- core/src/databases/database.rs | 38 +++--- core/src/stores/postgres.rs | 206 +++++++++++++++++---------------- core/src/stores/store.rs | 18 +-- 4 files changed, 139 insertions(+), 130 deletions(-) diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index ebd169d6e672..a0493d48d484 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -2076,7 +2076,7 @@ async fn databases_schema_retrieve( &format!("No database found for id `{}`", database_id), None, ), - Ok(Some(db)) => match db.get_schema(&project, state.store.clone()).await { + Ok(Some(db)) => match db.get_schema(state.store.clone()).await { Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", @@ -2125,10 +2125,7 @@ async fn databases_query_run( &format!("No database found for id `{}`", database_id), None, ), - Ok(Some(db)) => match db - .query(&project, state.store.clone(), &payload.query) - .await - { + Ok(Some(db)) => match db.query(state.store.clone(), &payload.query).await { Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 3e182bcd3c81..004bf2eed4bb 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -27,6 +27,7 @@ impl ToString for DatabaseType { #[derive(Debug, Serialize)] pub struct Database { + project: Project, created: u64, data_source_id: String, database_id: String, @@ -35,8 +36,15 @@ pub struct Database { } impl Database { - pub fn new(created: u64, data_source_id: &str, database_id: &str, name: &str) -> Self { + pub fn new( + project: &Project, + created: u64, + data_source_id: &str, + database_id: &str, + name: &str, + ) -> Self { Database { + project: project.clone(), created: created, data_source_id: data_source_id.to_string(), database_id: database_id.to_string(), @@ -45,15 +53,11 @@ impl Database { } } - pub async fn get_schema( - &self, - project: &Project, - store: Box, - ) -> Result { + pub async fn get_schema(&self, store: Box) -> Result { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { - let rows = self.get_rows(project, store).await?; + let rows = self.get_rows(store).await?; let schema = rows .par_iter() @@ -70,9 +74,13 @@ impl Database { } } + // pub async fn batch_upsert_rows( + // &self, + + // ) + pub async fn create_in_memory_sqlite_conn( &self, - project: &Project, store: Box, ) -> Result { match self.db_type { @@ -82,14 +90,14 @@ impl Database { DatabaseType::LOCAL => { let time_build_db_start = utils::now(); - let schema = self.get_schema(project, store.clone()).await?; + let schema = self.get_schema(store.clone()).await?; utils::done(&format!( "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", utils::now() - time_build_db_start )); let time_get_rows_start = utils::now(); - let rows_by_table = match self.get_rows(project, store.clone()).await { + let rows_by_table = match self.get_rows(store.clone()).await { Ok(rows) => Ok(rows .into_iter() .map(|(table, rows)| (table.table_id().to_string(), rows)) @@ -171,16 +179,13 @@ impl Database { pub async fn query( &self, - project: &Project, store: Box, query: &str, ) -> Result<(Vec, TableSchema)> { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { - let conn = self - .create_in_memory_sqlite_conn(project, store.clone()) - .await?; + let conn = self.create_in_memory_sqlite_conn(store.clone()).await?; let time_query_start = utils::now(); @@ -270,11 +275,10 @@ impl Database { pub async fn get_rows( &self, - project: &Project, store: Box, ) -> Result)>> { let (tables, _) = store - .list_databases_tables(&project, &self.data_source_id, &self.database_id, None) + .list_databases_tables(&self.project, &self.data_source_id, &self.database_id, None) .await?; // Concurrently retrieve table rows. @@ -287,7 +291,7 @@ impl Database { async move { let (rows, _) = store .list_database_rows( - project, + &self.project, self.data_source_id.as_str(), self.database_id.as_str(), table.table_id(), diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index cca2eafc9b15..765b8f949243 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -197,7 +197,7 @@ impl Store for PostgresStore { let pool = self.pool.clone(); let c = pool.get().await?; - // Check that the dataset_id and hash exist + // Check that the dataset_id and hash exist. let r = c .query( "SELECT id, created FROM datasets @@ -218,7 +218,7 @@ impl Store for PostgresStore { } let (row_id, created) = d.unwrap(); - // Retrieve data points through datasets_joins + // Retrieve data points through datasets_joins. let stmt = c .prepare( "SELECT datasets_joins.point_idx, datasets_points.json \ @@ -784,7 +784,7 @@ impl Store for PostgresStore { match block { None => { - // Retrieve data points through datasets_joins + // Retrieve data points through datasets_joins. let stmt = c .prepare( "SELECT \ @@ -835,7 +835,7 @@ impl Store for PostgresStore { match block { None => (), Some((block_type, block_name)) => { - // Retrieve data points through datasets_joins for one block + // Retrieve data points through datasets_joins for one block. let stmt = c .prepare( "SELECT \ @@ -1227,7 +1227,7 @@ impl Store for PostgresStore { let tx = c.transaction().await?; - // get current tags and put them into a set + // Get current tags and put them into a set. let current_tags_result = tx .query( "SELECT tags_array FROM data_sources_documents WHERE data_source = $1 \ @@ -1243,7 +1243,7 @@ impl Store for PostgresStore { } }; - // update the set of tags based on the add and remove lists + // Update the set of tags based on the add and remove lists. for tag in add_tags { current_tags.insert(tag.clone()); } @@ -1292,8 +1292,8 @@ impl Store for PostgresStore { _ => unreachable!(), }; - // the `created` timestamp of the version specified by `latest_hash` - // (if `latest_hash` is `None`, then this is the latest version's `created` timestamp) + // The `created` timestamp of the version specified by `latest_hash` + // (if `latest_hash` is `None`, then this is the latest version's `created` timestamp). let latest_hash_created: i64 = match latest_hash { Some(latest_hash) => { let stmt = c @@ -1660,7 +1660,7 @@ impl Store for PostgresStore { let chunk_count: i64 = r.get(9); let tags = if remove_system_tags { - // remove tags that are prefixed with the system tag prefix + // Remove tags that are prefixed with the system tag prefix. tags.into_iter() .filter(|t| !t.starts_with(DATA_SOURCE_DOCUMENT_SYSTEM_TAG_PREFIX)) .collect() @@ -1788,7 +1788,7 @@ impl Store for PostgresStore { let pool = self.pool.clone(); let c = pool.get().await?; - // get the data source row id + // Get the data source row id. let stmt = c .prepare( "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", @@ -1820,7 +1820,13 @@ impl Store for PostgresStore { ) .await?; - Ok(Database::new(db_created, &data_source_id, &db_id, &db_name)) + Ok(Database::new( + &Project::new_from_id(project_id), + db_created, + &data_source_id, + &db_id, + &db_name, + )) } async fn load_database( @@ -1855,6 +1861,7 @@ impl Store for PostgresStore { match d { None => Ok(None), Some((created, database_id, name, data_source_id)) => Ok(Some(Database::new( + &Project::new_from_id(project_id), created as u64, &data_source_id, &database_id, @@ -1908,6 +1915,7 @@ impl Store for PostgresStore { let data_source_id: String = row.get(3); Ok(Database::new( + &Project::new_from_id(project_id), created as u64, &data_source_id, &database_id, @@ -2148,88 +2156,88 @@ impl Store for PostgresStore { Ok((tables, total)) } - async fn upsert_database_row( - &self, - project: &Project, - data_source_id: &str, - database_id: &str, - table_id: &str, - row_id: &str, - content: &Value, - ) -> Result { - let project_id = project.project_id(); - let data_source_id = data_source_id.to_string(); - let database_id = database_id.to_string(); - let table_id = table_id.to_string(); - - let pool = self.pool.clone(); - let c = pool.get().await?; - - // get the data source row id - let stmt = c - .prepare( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", - ) - .await?; - - let r = c.query(&stmt, &[&project_id, &data_source_id]).await?; - let data_source_row_id: i64 = match r.len() { - 0 => panic!("Unknown DataSource: {}", data_source_id), - 1 => r[0].get(0), - _ => unreachable!(), - }; - - // get the database row id - let stmt = c - .prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1") - .await?; - - let r = c.query(&stmt, &[&data_source_row_id, &database_id]).await?; - - let database_row_id: i64 = match r.len() { - 0 => panic!("Unknown Database: {}", database_id), - 1 => r[0].get(0), - _ => unreachable!(), - }; - - // get the table row id - let stmt = c - .prepare( - "SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1", - ) - .await?; - let r = c.query(&stmt, &[&database_row_id, &table_id]).await?; - let table_row_id: i64 = match r.len() { - 0 => panic!("Unknown Table: {}", table_id), - 1 => r[0].get(0), - _ => unreachable!(), - }; - - let row_created = utils::now(); - - let row_id = row_id.to_string(); - let row_data = content.to_string(); - - // Upsert Database Row. - let stmt = c - .prepare( - "INSERT INTO databases_rows \ - (id, database_table, created, row_id, content) \ - VALUES (DEFAULT, $1, $2, $3, $4, $5) \ - ON CONFLICT (row_id, database_table) DO UPDATE \ - SET content = EXCLUDED.content \ - RETURNING id", - ) - .await?; - - c.query_one( - &stmt, - &[&table_row_id, &(row_created as i64), &row_id, &row_data], - ) - .await?; - - Ok(DatabaseRow::new(row_created, Some(row_id), content)) - } + // async fn upsert_database_row( + // &self, + // project: &Project, + // data_source_id: &str, + // database_id: &str, + // table_id: &str, + // row_id: &str, + // content: &Value, + // ) -> Result { + // let project_id = project.project_id(); + // let data_source_id = data_source_id.to_string(); + // let database_id = database_id.to_string(); + // let table_id = table_id.to_string(); + + // let pool = self.pool.clone(); + // let c = pool.get().await?; + + // // get the data source row id + // let stmt = c + // .prepare( + // "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + // ) + // .await?; + + // let r = c.query(&stmt, &[&project_id, &data_source_id]).await?; + // let data_source_row_id: i64 = match r.len() { + // 0 => panic!("Unknown DataSource: {}", data_source_id), + // 1 => r[0].get(0), + // _ => unreachable!(), + // }; + + // // get the database row id + // let stmt = c + // .prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1") + // .await?; + + // let r = c.query(&stmt, &[&data_source_row_id, &database_id]).await?; + + // let database_row_id: i64 = match r.len() { + // 0 => panic!("Unknown Database: {}", database_id), + // 1 => r[0].get(0), + // _ => unreachable!(), + // }; + + // // get the table row id + // let stmt = c + // .prepare( + // "SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1", + // ) + // .await?; + // let r = c.query(&stmt, &[&database_row_id, &table_id]).await?; + // let table_row_id: i64 = match r.len() { + // 0 => panic!("Unknown Table: {}", table_id), + // 1 => r[0].get(0), + // _ => unreachable!(), + // }; + + // let row_created = utils::now(); + + // let row_id = row_id.to_string(); + // let row_data = content.to_string(); + + // // Upsert Database Row. + // let stmt = c + // .prepare( + // "INSERT INTO databases_rows \ + // (id, database_table, created, row_id, content) \ + // VALUES (DEFAULT, $1, $2, $3, $4, $5) \ + // ON CONFLICT (row_id, database_table) DO UPDATE \ + // SET content = EXCLUDED.content \ + // RETURNING id", + // ) + // .await?; + + // c.query_one( + // &stmt, + // &[&table_row_id, &(row_created as i64), &row_id, &row_data], + // ) + // .await?; + + // Ok(DatabaseRow::new(row_created, Some(row_id), content)) + // } async fn load_database_row( &self, @@ -2406,7 +2414,7 @@ impl Store for PostgresStore { let pool = self.pool.clone(); let mut c = pool.get().await?; - // get the data source row id + // Get the data source row id. let stmt = c .prepare( "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", @@ -2419,7 +2427,7 @@ impl Store for PostgresStore { _ => unreachable!(), }; - // get the database row id + // Get the database row id. let stmt = c .prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1") .await?; @@ -2430,7 +2438,7 @@ impl Store for PostgresStore { _ => unreachable!(), }; - // get the table row id + // Get the table row id. let stmt = c .prepare( "SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1", @@ -2443,10 +2451,10 @@ impl Store for PostgresStore { _ => unreachable!(), }; - // start transaction + // Start transaction. let c = c.transaction().await?; - // truncate table if required + // Truncate table if required. if truncate { let stmt = c .prepare("DELETE FROM databases_rows WHERE database_table = $1") @@ -2454,7 +2462,7 @@ impl Store for PostgresStore { c.execute(&stmt, &[&table_row_id]).await?; } - // prepare insertion/updation statement + // Prepare insertion/updation statement. let stmt = c .prepare( "INSERT INTO databases_rows \ diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index 97f1ec9c66c5..d6cfa0a9abb4 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -196,15 +196,15 @@ pub trait Store { database_id: &str, limit_offset: Option<(usize, usize)>, ) -> Result<(Vec, usize)>; - async fn upsert_database_row( - &self, - project: &Project, - data_source_id: &str, - database_id: &str, - table_id: &str, - row_id: &str, - content: &Value, - ) -> Result; + // async fn upsert_database_row( + // &self, + // project: &Project, + // data_source_id: &str, + // database_id: &str, + // table_id: &str, + // row_id: &str, + // content: &Value, + // ) -> Result; async fn batch_upsert_database_rows( &self, project: &Project, From b8617d7d8afdc324c3ed84cd573db619038e540a Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Wed, 22 Nov 2023 17:01:11 +0100 Subject: [PATCH 2/4] nit --- core/src/databases/database.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 004bf2eed4bb..bff04be3c5f8 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -74,11 +74,6 @@ impl Database { } } - // pub async fn batch_upsert_rows( - // &self, - - // ) - pub async fn create_in_memory_sqlite_conn( &self, store: Box, From fb92ca248c6755940762bad54cd624d518f00ca0 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Wed, 22 Nov 2023 17:02:57 +0100 Subject: [PATCH 3/4] fix --- core/src/blocks/database.rs | 2 +- core/src/blocks/database_schema.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/blocks/database.rs b/core/src/blocks/database.rs index 81245160d9e4..66d2e415a7a8 100644 --- a/core/src/blocks/database.rs +++ b/core/src/blocks/database.rs @@ -112,7 +112,7 @@ impl Block for Database { ))?, }; - let (rows, schema) = match database.query(&project, env.store.clone(), &query).await { + let (rows, schema) = match database.query(env.store.clone(), &query).await { Ok(r) => r, Err(e) => Err(anyhow!( "Error querying database `{}` in data source `{}`: {}", diff --git a/core/src/blocks/database_schema.rs b/core/src/blocks/database_schema.rs index e9a9e139de6c..7022f6ae00ca 100644 --- a/core/src/blocks/database_schema.rs +++ b/core/src/blocks/database_schema.rs @@ -104,7 +104,7 @@ async fn get_database_schema( ))?, }; - match database.get_schema(&project, env.store.clone()).await { + match database.get_schema(env.store.clone()).await { Ok(s) => Ok(s), Err(e) => Err(anyhow!( "Error getting schema for database `{}` in data source `{}`: {}", From 2f55443fa828575332a6ea43141ae543a3f194c4 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Wed, 22 Nov 2023 17:07:53 +0100 Subject: [PATCH 4/4] remove commented dead code --- core/src/stores/postgres.rs | 83 ------------------------------------- core/src/stores/store.rs | 9 ---- 2 files changed, 92 deletions(-) diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index 765b8f949243..30ab2b702455 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -2156,89 +2156,6 @@ impl Store for PostgresStore { Ok((tables, total)) } - // async fn upsert_database_row( - // &self, - // project: &Project, - // data_source_id: &str, - // database_id: &str, - // table_id: &str, - // row_id: &str, - // content: &Value, - // ) -> Result { - // let project_id = project.project_id(); - // let data_source_id = data_source_id.to_string(); - // let database_id = database_id.to_string(); - // let table_id = table_id.to_string(); - - // let pool = self.pool.clone(); - // let c = pool.get().await?; - - // // get the data source row id - // let stmt = c - // .prepare( - // "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", - // ) - // .await?; - - // let r = c.query(&stmt, &[&project_id, &data_source_id]).await?; - // let data_source_row_id: i64 = match r.len() { - // 0 => panic!("Unknown DataSource: {}", data_source_id), - // 1 => r[0].get(0), - // _ => unreachable!(), - // }; - - // // get the database row id - // let stmt = c - // .prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1") - // .await?; - - // let r = c.query(&stmt, &[&data_source_row_id, &database_id]).await?; - - // let database_row_id: i64 = match r.len() { - // 0 => panic!("Unknown Database: {}", database_id), - // 1 => r[0].get(0), - // _ => unreachable!(), - // }; - - // // get the table row id - // let stmt = c - // .prepare( - // "SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1", - // ) - // .await?; - // let r = c.query(&stmt, &[&database_row_id, &table_id]).await?; - // let table_row_id: i64 = match r.len() { - // 0 => panic!("Unknown Table: {}", table_id), - // 1 => r[0].get(0), - // _ => unreachable!(), - // }; - - // let row_created = utils::now(); - - // let row_id = row_id.to_string(); - // let row_data = content.to_string(); - - // // Upsert Database Row. - // let stmt = c - // .prepare( - // "INSERT INTO databases_rows \ - // (id, database_table, created, row_id, content) \ - // VALUES (DEFAULT, $1, $2, $3, $4, $5) \ - // ON CONFLICT (row_id, database_table) DO UPDATE \ - // SET content = EXCLUDED.content \ - // RETURNING id", - // ) - // .await?; - - // c.query_one( - // &stmt, - // &[&table_row_id, &(row_created as i64), &row_id, &row_data], - // ) - // .await?; - - // Ok(DatabaseRow::new(row_created, Some(row_id), content)) - // } - async fn load_database_row( &self, project: &Project, diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index d6cfa0a9abb4..d6e322c6c320 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -196,15 +196,6 @@ pub trait Store { database_id: &str, limit_offset: Option<(usize, usize)>, ) -> Result<(Vec, usize)>; - // async fn upsert_database_row( - // &self, - // project: &Project, - // data_source_id: &str, - // database_id: &str, - // table_id: &str, - // row_id: &str, - // content: &Value, - // ) -> Result; async fn batch_upsert_database_rows( &self, project: &Project,