From 154952c0df8a2d04e4c6050da74a61571ba6d0b5 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Fri, 26 Jul 2024 17:18:39 +0200 Subject: [PATCH] `front`/`core` support for Table parents (#6539) * core: add support for table timestamp, tags and parents * migration * WIP front work to upsert table with timestamp/tags/parents * front table upsert * fix --- core/bin/dust_api.rs | 11 ++- core/src/blocks/helpers.rs | 1 - core/src/databases/database.rs | 16 ++++ .../20240726_add_tables_parents.sql | 8 ++ core/src/stores/postgres.rs | 85 +++++++++++++++---- core/src/stores/store.rs | 12 ++- front/lib/api/tables.ts | 13 ++- front/lib/upsert_queue.ts | 5 +- .../[wId]/data_sources/[name]/tables/index.ts | 9 ++ .../w/[wId]/data_sources/[name]/tables/csv.ts | 13 ++- .../data-sources/[name]/tables/upsert.tsx | 6 ++ front/temporal/upsert_queue/activities.ts | 3 + types/src/front/lib/core_api.ts | 9 ++ 13 files changed, 167 insertions(+), 24 deletions(-) create mode 100644 core/src/stores/migrations/20240726_add_tables_parents.sql diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index 954de11b4f72..5096c5898257 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -27,7 +27,7 @@ use dust::{ run, sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS}, stores::{postgres, store}, - utils::{error_response, APIError, APIResponse, CoreRequestMakeSpan}, + utils::{self, error_response, APIError, APIResponse, CoreRequestMakeSpan}, }; use futures::future::try_join_all; use hyper::http::StatusCode; @@ -1925,6 +1925,9 @@ struct DatabasesTablesUpsertPayload { table_id: String, name: String, description: String, + timestamp: Option, + tags: Vec, + parents: Vec, } async fn tables_upsert( @@ -1942,6 +1945,12 @@ async fn tables_upsert( &payload.table_id, &payload.name, &payload.description, + match payload.timestamp { + Some(timestamp) => timestamp, + None => utils::now(), + }, + &payload.tags, + &payload.parents, ) .await { diff --git a/core/src/blocks/helpers.rs b/core/src/blocks/helpers.rs index 1c634e6ce107..a2bae25d1696 100644 --- a/core/src/blocks/helpers.rs +++ b/core/src/blocks/helpers.rs @@ -4,7 +4,6 @@ use anyhow::{anyhow, Result}; use hyper::body::Buf; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use serde_json::Value; use std::io::prelude::*; use url::Url; use urlencoding::encode; diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index d11cc8f1bba6..52af1e363a12 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -144,6 +144,10 @@ pub struct Table { table_id: String, name: String, description: String, + timestamp: u64, + tags: Vec, + parents: Vec, + schema: Option, schema_stale_at: Option, } @@ -160,6 +164,9 @@ impl Table { table_id: &str, name: &str, description: &str, + timestamp: u64, + tags: Vec, + parents: Vec, schema: &Option, schema_stale_at: Option, ) -> Self { @@ -170,6 +177,9 @@ impl Table { table_id: table_id.to_string(), name: name.to_string(), description: description.to_string(), + timestamp, + tags, + parents, schema: schema.clone(), schema_stale_at, } @@ -193,6 +203,9 @@ impl Table { pub fn description(&self) -> &str { &self.description } + pub fn timestamp(&self) -> u64 { + self.timestamp + } pub fn schema_cached(&self) -> Option<&TableSchema> { self.schema.as_ref() } @@ -512,6 +525,9 @@ mod tests { "table_id", "test_dbml", "Test records for DBML rendering", + utils::now(), + vec![], + vec![], &Some(schema), None, ); diff --git a/core/src/stores/migrations/20240726_add_tables_parents.sql b/core/src/stores/migrations/20240726_add_tables_parents.sql new file mode 100644 index 000000000000..98cc0a537757 --- /dev/null +++ b/core/src/stores/migrations/20240726_add_tables_parents.sql @@ -0,0 +1,8 @@ +-- pre deploy +ALTER TABLE tables ADD COLUMN parents TEXT[] NOT NULL DEFAULT '{}'; +ALTER TABLE tables ADD COLUMN tags_array TEXT[] NOT NULL DEFAULT '{}'; +ALTER TABLE tables ADD COLUMN timestamp BIGINT; + +-- post deploy +UPDATE tables SET timestamp = created; +ALTER TABLE tables ALTER COLUMN timestamp SET NOT NULL; diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index 8c3d31f82585..98b24f3fdd95 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -2234,10 +2234,21 @@ impl Store for PostgresStore { table_id: &str, name: &str, description: &str, + timestamp: u64, + tags: &Vec, + parents: &Vec, ) -> Result { let project_id = project.project_id(); let data_source_id = data_source_id.to_string(); + let table_created = utils::now(); + let table_id = table_id.to_string(); + let table_name = name.to_string(); + let table_description = description.to_string(); + let table_timestamp = timestamp; + let table_tags = tags.clone(); + let table_parents = parents.clone(); + let pool = self.pool.clone(); let c = pool.get().await?; @@ -2254,18 +2265,13 @@ impl Store for PostgresStore { _ => unreachable!(), }; - let table_created = utils::now(); - - let table_id = table_id.to_string(); - let table_name = name.to_string(); - let table_description = description.to_string(); - // Upsert Table. let stmt = c .prepare( "INSERT INTO tables \ - (id, data_source, created, table_id, name, description) \ - VALUES (DEFAULT, $1, $2, $3, $4, $5) \ + (id, data_source, created, table_id, name, description, + timestamp, tags_array, parents) \ + VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8) \ ON CONFLICT (table_id, data_source) DO UPDATE \ SET name = EXCLUDED.name, description = EXCLUDED.description \ RETURNING id", @@ -2280,6 +2286,9 @@ impl Store for PostgresStore { &table_id, &table_name, &table_description, + &(table_timestamp as i64), + &table_tags, + &table_parents, ], ) .await?; @@ -2291,6 +2300,9 @@ impl Store for PostgresStore { &table_id, &table_name, &table_description, + timestamp, + table_tags, + table_parents, &None, None, )) @@ -2411,13 +2423,24 @@ impl Store for PostgresStore { let stmt = c .prepare( - "SELECT created, table_id, name, description, schema, schema_stale_at FROM tables \ + "SELECT created, table_id, name, description, timestamp, tags_array, parents, \ + schema, schema_stale_at FROM tables \ WHERE data_source = $1 AND table_id = $2 LIMIT 1", ) .await?; let r = c.query(&stmt, &[&data_source_row_id, &table_id]).await?; - let d: Option<(i64, String, String, String, Option, Option)> = match r.len() { + let d: Option<( + i64, + String, + String, + String, + Option, + Vec, + Vec, + Option, + Option, + )> = match r.len() { 0 => None, 1 => Some(( r[0].get(0), @@ -2426,13 +2449,26 @@ impl Store for PostgresStore { r[0].get(3), r[0].get(4), r[0].get(5), + r[0].get(6), + r[0].get(7), + r[0].get(8), )), _ => unreachable!(), }; match d { None => Ok(None), - Some((created, table_id, name, description, schema, schema_stale_at)) => { + Some(( + created, + table_id, + name, + description, + timestamp, + tags, + parents, + schema, + schema_stale_at, + )) => { let parsed_schema: Option = match schema { None => None, Some(schema) => { @@ -2450,6 +2486,12 @@ impl Store for PostgresStore { &table_id, &name, &description, + match timestamp { + None => created as u64, + Some(t) => t as u64, + }, + tags, + parents, &parsed_schema, schema_stale_at.map(|t| t as u64), ))) @@ -2487,8 +2529,9 @@ impl Store for PostgresStore { None => { let stmt = c .prepare( - "SELECT created, table_id, name, description, schema, schema_stale_at FROM tables \ - WHERE data_source = $1", + "SELECT created, table_id, name, description, timestamp, tags_array, \ + parents, schema, schema_stale_at FROM tables \ + WHERE data_source = $1", ) .await?; c.query(&stmt, &[&data_source_row_id]).await? @@ -2496,7 +2539,8 @@ impl Store for PostgresStore { Some((limit, offset)) => { let stmt = c .prepare( - "SELECT created, table_id, name, description, schema, schema_stale_at FROM tables \ + "SELECT created, table_id, name, description, timestamp, tags_array, \ + parents, schema, schema_stale_at FROM tables \ WHERE data_source = $1 LIMIT $2 OFFSET $3", ) .await?; @@ -2515,8 +2559,11 @@ impl Store for PostgresStore { let table_id: String = r.get(1); let name: String = r.get(2); let description: String = r.get(3); - let schema: Option = r.get(4); - let schema_stale_at: Option = r.get(5); + let timestamp: Option = r.get(4); + let tags: Vec = r.get(5); + let parents: Vec = r.get(6); + let schema: Option = r.get(7); + let schema_stale_at: Option = r.get(8); let parsed_schema: Option = match schema { None => None, @@ -2536,6 +2583,12 @@ impl Store for PostgresStore { &table_id, &name, &description, + match timestamp { + None => created as u64, + Some(t) => t as u64, + }, + tags, + parents, &parsed_schema, schema_stale_at.map(|t| t as u64), )) diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index aed74460e9c2..7cf748e5bd9e 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -196,6 +196,9 @@ pub trait Store { table_id: &str, name: &str, description: &str, + timestamp: u64, + tags: &Vec, + parents: &Vec, ) -> Result
; async fn update_table_schema( &self, @@ -444,6 +447,9 @@ pub const POSTGRES_TABLES: [&'static str; 14] = [ table_id TEXT NOT NULL, -- unique within datasource name TEXT NOT NULL, -- unique within datasource description TEXT NOT NULL, + timestamp BIGINT NOT NULL, + tags_array TEXT[] NOT NULL, + parents TEXT[] NOT NULL, schema TEXT, -- json, kept up-to-date automatically with the last insert schema_stale_at BIGINT, -- timestamp when the schema was last invalidated data_source BIGINT NOT NULL, @@ -451,7 +457,7 @@ pub const POSTGRES_TABLES: [&'static str; 14] = [ );", ]; -pub const SQL_INDEXES: [&'static str; 24] = [ +pub const SQL_INDEXES: [&'static str; 26] = [ "CREATE INDEX IF NOT EXISTS idx_specifications_project_created ON specifications (project, created);", "CREATE INDEX IF NOT EXISTS @@ -504,6 +510,10 @@ pub const SQL_INDEXES: [&'static str; 24] = [ idx_databases_table_ids_hash ON databases (table_ids_hash);", "CREATE UNIQUE INDEX IF NOT EXISTS idx_tables_data_source_table_id ON tables (data_source, table_id);", + "CREATE INDEX IF NOT EXISTS + idx_tables_tags_array ON tables USING GIN (tags_array);", + "CREATE INDEX IF NOT EXISTS + idx_tables_parents_array ON tables USING GIN (parents);", "CREATE UNIQUE INDEX IF NOT EXISTS idx_sqlite_workers_url ON sqlite_workers (url);", ]; diff --git a/front/lib/api/tables.ts b/front/lib/api/tables.ts index 377cbba89682..3b1b20b6856c 100644 --- a/front/lib/api/tables.ts +++ b/front/lib/api/tables.ts @@ -116,6 +116,9 @@ export async function upsertTableFromCsv({ tableName, tableDescription, tableId, + tableTimestamp, + tableTags, + tableParents, csv, truncate, }: { @@ -125,6 +128,9 @@ export async function upsertTableFromCsv({ tableName: string; tableDescription: string; tableId: string; + tableTimestamp: number | null; + tableTags: string[]; + tableParents: string[]; csv: string | null; truncate: boolean; }): Promise> { @@ -198,9 +204,12 @@ export async function upsertTableFromCsv({ const tableRes = await coreAPI.upsertTable({ projectId, dataSourceName, - description: tableDescription, - name: tableName, tableId, + name: tableName, + description: tableDescription, + timestamp: tableTimestamp, + tags: tableTags, + parents: tableParents, }); if (tableRes.isErr()) { diff --git a/front/lib/upsert_queue.ts b/front/lib/upsert_queue.ts index 83b9a98b5402..c7d10bf6b44d 100644 --- a/front/lib/upsert_queue.ts +++ b/front/lib/upsert_queue.ts @@ -44,9 +44,12 @@ export const EnqueueUpsertTable = t.type({ workspaceId: t.string, projectId: t.string, dataSourceName: t.string, + tableId: t.string, tableName: t.string, tableDescription: t.string, - tableId: t.string, + tableTimestamp: t.union([t.number, t.undefined, t.null]), + tableTags: t.union([t.array(t.string), t.undefined, t.null]), + tableParents: t.union([t.array(t.string), t.undefined, t.null]), csv: t.union([t.string, t.null]), truncate: t.boolean, }); diff --git a/front/pages/api/v1/w/[wId]/data_sources/[name]/tables/index.ts b/front/pages/api/v1/w/[wId]/data_sources/[name]/tables/index.ts index 07157668cf75..dfc5eb13c788 100644 --- a/front/pages/api/v1/w/[wId]/data_sources/[name]/tables/index.ts +++ b/front/pages/api/v1/w/[wId]/data_sources/[name]/tables/index.ts @@ -25,6 +25,9 @@ const UpsertDatabaseTableRequestBodySchema = t.type({ table_id: t.union([t.string, t.undefined]), name: t.string, description: t.string, + timestamp: t.union([t.number, t.undefined, t.null]), + tags: t.union([t.array(t.string), t.undefined, t.null]), + parents: t.union([t.array(t.string), t.undefined, t.null]), }); type UpsertTableResponseBody = { @@ -215,6 +218,9 @@ async function handler( name, description, table_id: maybeTableId, + timestamp, + tags, + parents, } = bodyValidation.right; const tableId = maybeTableId || generateLegacyModelSId(); @@ -260,6 +266,9 @@ async function handler( tableId, name, description, + timestamp: timestamp ?? null, + tags: tags || [], + parents: parents || [], }); if (upsertRes.isErr()) { diff --git a/front/pages/api/w/[wId]/data_sources/[name]/tables/csv.ts b/front/pages/api/w/[wId]/data_sources/[name]/tables/csv.ts index f670e18b16b2..464c950be752 100644 --- a/front/pages/api/w/[wId]/data_sources/[name]/tables/csv.ts +++ b/front/pages/api/w/[wId]/data_sources/[name]/tables/csv.ts @@ -24,6 +24,9 @@ export const UpsertTableFromCsvSchema = t.intersection([ t.type({ name: SlugifiedString, description: t.string, + timestamp: t.union([t.number, t.undefined, t.null]), + tags: t.union([t.array(t.string), t.undefined, t.null]), + parents: t.union([t.array(t.string), t.undefined, t.null]), truncate: t.boolean, async: t.union([t.boolean, t.undefined]), }), @@ -172,9 +175,12 @@ export async function handlePostTableCsvUpsertRequest( workspaceId: owner.sId, projectId: dataSource.dustAPIProjectId, dataSourceName, + tableId, tableName: name, tableDescription: description, - tableId, + tableTimestamp: bodyValidation.right.timestamp ?? null, + tableTags: bodyValidation.right.tags || [], + tableParents: bodyValidation.right.parents || [], csv: csv ?? null, truncate, }, @@ -205,9 +211,12 @@ export async function handlePostTableCsvUpsertRequest( auth, projectId: dataSource.dustAPIProjectId, dataSourceName, + tableId, tableName: name, tableDescription: description, - tableId, + tableTimestamp: bodyValidation.right.timestamp ?? null, + tableTags: bodyValidation.right.tags || [], + tableParents: bodyValidation.right.parents || [], csv: csv ?? null, truncate, }); diff --git a/front/pages/w/[wId]/builder/data-sources/[name]/tables/upsert.tsx b/front/pages/w/[wId]/builder/data-sources/[name]/tables/upsert.tsx index ebcfb3c3bc1c..f0c430ee4c3e 100644 --- a/front/pages/w/[wId]/builder/data-sources/[name]/tables/upsert.tsx +++ b/front/pages/w/[wId]/builder/data-sources/[name]/tables/upsert.tsx @@ -232,6 +232,9 @@ export default function TableUpsert({ description: description, csv: fileContent, tableId: loadTableId ?? undefined, + timestamp: null, + tags: [], + parents: [], truncate: true, async: false, }; @@ -240,6 +243,9 @@ export default function TableUpsert({ name: tableName, description: description, tableId: tableId, + timestamp: null, + tags: [], + parents: [], csv: undefined, truncate: false, async: false, diff --git a/front/temporal/upsert_queue/activities.ts b/front/temporal/upsert_queue/activities.ts index d0f82189e820..de8201d72b22 100644 --- a/front/temporal/upsert_queue/activities.ts +++ b/front/temporal/upsert_queue/activities.ts @@ -229,6 +229,9 @@ export async function upsertTableActivity( tableName: upsertQueueItem.tableName, tableDescription: upsertQueueItem.tableDescription, tableId: upsertQueueItem.tableId, + tableTimestamp: upsertQueueItem.tableTimestamp ?? null, + tableTags: upsertQueueItem.tableTags || [], + tableParents: upsertQueueItem.tableParents || [], csv: upsertQueueItem.csv, truncate: upsertQueueItem.truncate, }); diff --git a/types/src/front/lib/core_api.ts b/types/src/front/lib/core_api.ts index 21f1cc770610..e175784b9da8 100644 --- a/types/src/front/lib/core_api.ts +++ b/types/src/front/lib/core_api.ts @@ -1008,12 +1008,18 @@ export class CoreAPI { tableId, name, description, + timestamp, + tags, + parents, }: { projectId: string; dataSourceName: string; tableId: string; name: string; description: string; + timestamp: number | null; + tags: string[]; + parents: string[]; }): Promise> { const response = await this._fetchWithError( `${this._url}/projects/${encodeURIComponent( @@ -1028,6 +1034,9 @@ export class CoreAPI { table_id: tableId, name: name, description: description, + timestamp, + tags, + parents, }), } );