Skip to content

Commit

Permalink
front/core support for Table parents (#6539)
Browse files Browse the repository at this point in the history
* core: add support for table timestamp, tags and parents

* migration

* WIP front work to upsert table with timestamp/tags/parents

* front table upsert

* fix
  • Loading branch information
spolu authored Jul 26, 2024
1 parent 3329ec7 commit 154952c
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 24 deletions.
11 changes: 10 additions & 1 deletion core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use dust::{
run,
sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS},
stores::{postgres, store},
utils::{error_response, APIError, APIResponse, CoreRequestMakeSpan},
utils::{self, error_response, APIError, APIResponse, CoreRequestMakeSpan},
};
use futures::future::try_join_all;
use hyper::http::StatusCode;
Expand Down Expand Up @@ -1925,6 +1925,9 @@ struct DatabasesTablesUpsertPayload {
table_id: String,
name: String,
description: String,
timestamp: Option<u64>,
tags: Vec<String>,
parents: Vec<String>,
}

async fn tables_upsert(
Expand All @@ -1942,6 +1945,12 @@ async fn tables_upsert(
&payload.table_id,
&payload.name,
&payload.description,
match payload.timestamp {
Some(timestamp) => timestamp,
None => utils::now(),
},
&payload.tags,
&payload.parents,
)
.await
{
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use anyhow::{anyhow, Result};
use hyper::body::Buf;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::io::prelude::*;
use url::Url;
use urlencoding::encode;
Expand Down
16 changes: 16 additions & 0 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ pub struct Table {
table_id: String,
name: String,
description: String,
timestamp: u64,
tags: Vec<String>,
parents: Vec<String>,

schema: Option<TableSchema>,
schema_stale_at: Option<u64>,
}
Expand All @@ -160,6 +164,9 @@ impl Table {
table_id: &str,
name: &str,
description: &str,
timestamp: u64,
tags: Vec<String>,
parents: Vec<String>,
schema: &Option<TableSchema>,
schema_stale_at: Option<u64>,
) -> Self {
Expand All @@ -170,6 +177,9 @@ impl Table {
table_id: table_id.to_string(),
name: name.to_string(),
description: description.to_string(),
timestamp,
tags,
parents,
schema: schema.clone(),
schema_stale_at,
}
Expand All @@ -193,6 +203,9 @@ impl Table {
pub fn description(&self) -> &str {
&self.description
}
pub fn timestamp(&self) -> u64 {
self.timestamp
}
pub fn schema_cached(&self) -> Option<&TableSchema> {
self.schema.as_ref()
}
Expand Down Expand Up @@ -512,6 +525,9 @@ mod tests {
"table_id",
"test_dbml",
"Test records for DBML rendering",
utils::now(),
vec![],
vec![],
&Some(schema),
None,
);
Expand Down
8 changes: 8 additions & 0 deletions core/src/stores/migrations/20240726_add_tables_parents.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- pre deploy
ALTER TABLE tables ADD COLUMN parents TEXT[] NOT NULL DEFAULT '{}';
ALTER TABLE tables ADD COLUMN tags_array TEXT[] NOT NULL DEFAULT '{}';
ALTER TABLE tables ADD COLUMN timestamp BIGINT;

-- post deploy
UPDATE tables SET timestamp = created;
ALTER TABLE tables ALTER COLUMN timestamp SET NOT NULL;
85 changes: 69 additions & 16 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2234,10 +2234,21 @@ impl Store for PostgresStore {
table_id: &str,
name: &str,
description: &str,
timestamp: u64,
tags: &Vec<String>,
parents: &Vec<String>,
) -> Result<Table> {
let project_id = project.project_id();
let data_source_id = data_source_id.to_string();

let table_created = utils::now();
let table_id = table_id.to_string();
let table_name = name.to_string();
let table_description = description.to_string();
let table_timestamp = timestamp;
let table_tags = tags.clone();
let table_parents = parents.clone();

let pool = self.pool.clone();
let c = pool.get().await?;

Expand All @@ -2254,18 +2265,13 @@ impl Store for PostgresStore {
_ => unreachable!(),
};

let table_created = utils::now();

let table_id = table_id.to_string();
let table_name = name.to_string();
let table_description = description.to_string();

// Upsert Table.
let stmt = c
.prepare(
"INSERT INTO tables \
(id, data_source, created, table_id, name, description) \
VALUES (DEFAULT, $1, $2, $3, $4, $5) \
(id, data_source, created, table_id, name, description,
timestamp, tags_array, parents) \
VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8) \
ON CONFLICT (table_id, data_source) DO UPDATE \
SET name = EXCLUDED.name, description = EXCLUDED.description \
RETURNING id",
Expand All @@ -2280,6 +2286,9 @@ impl Store for PostgresStore {
&table_id,
&table_name,
&table_description,
&(table_timestamp as i64),
&table_tags,
&table_parents,
],
)
.await?;
Expand All @@ -2291,6 +2300,9 @@ impl Store for PostgresStore {
&table_id,
&table_name,
&table_description,
timestamp,
table_tags,
table_parents,
&None,
None,
))
Expand Down Expand Up @@ -2411,13 +2423,24 @@ impl Store for PostgresStore {

let stmt = c
.prepare(
"SELECT created, table_id, name, description, schema, schema_stale_at FROM tables \
"SELECT created, table_id, name, description, timestamp, tags_array, parents, \
schema, schema_stale_at FROM tables \
WHERE data_source = $1 AND table_id = $2 LIMIT 1",
)
.await?;
let r = c.query(&stmt, &[&data_source_row_id, &table_id]).await?;

let d: Option<(i64, String, String, String, Option<String>, Option<i64>)> = match r.len() {
let d: Option<(
i64,
String,
String,
String,
Option<i64>,
Vec<String>,
Vec<String>,
Option<String>,
Option<i64>,
)> = match r.len() {
0 => None,
1 => Some((
r[0].get(0),
Expand All @@ -2426,13 +2449,26 @@ impl Store for PostgresStore {
r[0].get(3),
r[0].get(4),
r[0].get(5),
r[0].get(6),
r[0].get(7),
r[0].get(8),
)),
_ => unreachable!(),
};

match d {
None => Ok(None),
Some((created, table_id, name, description, schema, schema_stale_at)) => {
Some((
created,
table_id,
name,
description,
timestamp,
tags,
parents,
schema,
schema_stale_at,
)) => {
let parsed_schema: Option<TableSchema> = match schema {
None => None,
Some(schema) => {
Expand All @@ -2450,6 +2486,12 @@ impl Store for PostgresStore {
&table_id,
&name,
&description,
match timestamp {
None => created as u64,
Some(t) => t as u64,
},
tags,
parents,
&parsed_schema,
schema_stale_at.map(|t| t as u64),
)))
Expand Down Expand Up @@ -2487,16 +2529,18 @@ impl Store for PostgresStore {
None => {
let stmt = c
.prepare(
"SELECT created, table_id, name, description, schema, schema_stale_at FROM tables \
WHERE data_source = $1",
"SELECT created, table_id, name, description, timestamp, tags_array, \
parents, schema, schema_stale_at FROM tables \
WHERE data_source = $1",
)
.await?;
c.query(&stmt, &[&data_source_row_id]).await?
}
Some((limit, offset)) => {
let stmt = c
.prepare(
"SELECT created, table_id, name, description, schema, schema_stale_at FROM tables \
"SELECT created, table_id, name, description, timestamp, tags_array, \
parents, schema, schema_stale_at FROM tables \
WHERE data_source = $1 LIMIT $2 OFFSET $3",
)
.await?;
Expand All @@ -2515,8 +2559,11 @@ impl Store for PostgresStore {
let table_id: String = r.get(1);
let name: String = r.get(2);
let description: String = r.get(3);
let schema: Option<String> = r.get(4);
let schema_stale_at: Option<i64> = r.get(5);
let timestamp: Option<i64> = r.get(4);
let tags: Vec<String> = r.get(5);
let parents: Vec<String> = r.get(6);
let schema: Option<String> = r.get(7);
let schema_stale_at: Option<i64> = r.get(8);

let parsed_schema: Option<TableSchema> = match schema {
None => None,
Expand All @@ -2536,6 +2583,12 @@ impl Store for PostgresStore {
&table_id,
&name,
&description,
match timestamp {
None => created as u64,
Some(t) => t as u64,
},
tags,
parents,
&parsed_schema,
schema_stale_at.map(|t| t as u64),
))
Expand Down
12 changes: 11 additions & 1 deletion core/src/stores/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ pub trait Store {
table_id: &str,
name: &str,
description: &str,
timestamp: u64,
tags: &Vec<String>,
parents: &Vec<String>,
) -> Result<Table>;
async fn update_table_schema(
&self,
Expand Down Expand Up @@ -444,14 +447,17 @@ pub const POSTGRES_TABLES: [&'static str; 14] = [
table_id TEXT NOT NULL, -- unique within datasource
name TEXT NOT NULL, -- unique within datasource
description TEXT NOT NULL,
timestamp BIGINT NOT NULL,
tags_array TEXT[] NOT NULL,
parents TEXT[] NOT NULL,
schema TEXT, -- json, kept up-to-date automatically with the last insert
schema_stale_at BIGINT, -- timestamp when the schema was last invalidated
data_source BIGINT NOT NULL,
FOREIGN KEY(data_source) REFERENCES data_sources(id)
);",
];

pub const SQL_INDEXES: [&'static str; 24] = [
pub const SQL_INDEXES: [&'static str; 26] = [
"CREATE INDEX IF NOT EXISTS
idx_specifications_project_created ON specifications (project, created);",
"CREATE INDEX IF NOT EXISTS
Expand Down Expand Up @@ -504,6 +510,10 @@ pub const SQL_INDEXES: [&'static str; 24] = [
idx_databases_table_ids_hash ON databases (table_ids_hash);",
"CREATE UNIQUE INDEX IF NOT EXISTS
idx_tables_data_source_table_id ON tables (data_source, table_id);",
"CREATE INDEX IF NOT EXISTS
idx_tables_tags_array ON tables USING GIN (tags_array);",
"CREATE INDEX IF NOT EXISTS
idx_tables_parents_array ON tables USING GIN (parents);",
"CREATE UNIQUE INDEX IF NOT EXISTS
idx_sqlite_workers_url ON sqlite_workers (url);",
];
Expand Down
13 changes: 11 additions & 2 deletions front/lib/api/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ export async function upsertTableFromCsv({
tableName,
tableDescription,
tableId,
tableTimestamp,
tableTags,
tableParents,
csv,
truncate,
}: {
Expand All @@ -125,6 +128,9 @@ export async function upsertTableFromCsv({
tableName: string;
tableDescription: string;
tableId: string;
tableTimestamp: number | null;
tableTags: string[];
tableParents: string[];
csv: string | null;
truncate: boolean;
}): Promise<Result<{ table: CoreAPITable }, TableOperationError>> {
Expand Down Expand Up @@ -198,9 +204,12 @@ export async function upsertTableFromCsv({
const tableRes = await coreAPI.upsertTable({
projectId,
dataSourceName,
description: tableDescription,
name: tableName,
tableId,
name: tableName,
description: tableDescription,
timestamp: tableTimestamp,
tags: tableTags,
parents: tableParents,
});

if (tableRes.isErr()) {
Expand Down
5 changes: 4 additions & 1 deletion front/lib/upsert_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ export const EnqueueUpsertTable = t.type({
workspaceId: t.string,
projectId: t.string,
dataSourceName: t.string,
tableId: t.string,
tableName: t.string,
tableDescription: t.string,
tableId: t.string,
tableTimestamp: t.union([t.number, t.undefined, t.null]),
tableTags: t.union([t.array(t.string), t.undefined, t.null]),
tableParents: t.union([t.array(t.string), t.undefined, t.null]),
csv: t.union([t.string, t.null]),
truncate: t.boolean,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const UpsertDatabaseTableRequestBodySchema = t.type({
table_id: t.union([t.string, t.undefined]),
name: t.string,
description: t.string,
timestamp: t.union([t.number, t.undefined, t.null]),
tags: t.union([t.array(t.string), t.undefined, t.null]),
parents: t.union([t.array(t.string), t.undefined, t.null]),
});

type UpsertTableResponseBody = {
Expand Down Expand Up @@ -215,6 +218,9 @@ async function handler(
name,
description,
table_id: maybeTableId,
timestamp,
tags,
parents,
} = bodyValidation.right;

const tableId = maybeTableId || generateLegacyModelSId();
Expand Down Expand Up @@ -260,6 +266,9 @@ async function handler(
tableId,
name,
description,
timestamp: timestamp ?? null,
tags: tags || [],
parents: parents || [],
});

if (upsertRes.isErr()) {
Expand Down
Loading

0 comments on commit 154952c

Please sign in to comment.