Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core database: small nits and refactoring #2625

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ async fn databases_schema_retrieve(
&format!("No database found for id `{}`", database_id),
None,
),
Ok(Some(db)) => match db.get_schema(&project, state.store.clone()).await {
Ok(Some(db)) => match db.get_schema(state.store.clone()).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
Expand Down Expand Up @@ -2125,10 +2125,7 @@ async fn databases_query_run(
&format!("No database found for id `{}`", database_id),
None,
),
Ok(Some(db)) => match db
.query(&project, state.store.clone(), &payload.query)
.await
{
Ok(Some(db)) => match db.query(state.store.clone(), &payload.query).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
Expand Down
2 changes: 1 addition & 1 deletion core/src/blocks/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Block for Database {
))?,
};

let (rows, schema) = match database.query(&project, env.store.clone(), &query).await {
let (rows, schema) = match database.query(env.store.clone(), &query).await {
Ok(r) => r,
Err(e) => Err(anyhow!(
"Error querying database `{}` in data source `{}`: {}",
Expand Down
2 changes: 1 addition & 1 deletion core/src/blocks/database_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn get_database_schema(
))?,
};

match database.get_schema(&project, env.store.clone()).await {
match database.get_schema(env.store.clone()).await {
Ok(s) => Ok(s),
Err(e) => Err(anyhow!(
"Error getting schema for database `{}` in data source `{}`: {}",
Expand Down
33 changes: 16 additions & 17 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl ToString for DatabaseType {

#[derive(Debug, Serialize)]
pub struct Database {
project: Project,
created: u64,
data_source_id: String,
database_id: String,
Expand All @@ -35,8 +36,15 @@ pub struct Database {
}

impl Database {
pub fn new(created: u64, data_source_id: &str, database_id: &str, name: &str) -> Self {
pub fn new(
project: &Project,
created: u64,
data_source_id: &str,
database_id: &str,
name: &str,
) -> Self {
Database {
project: project.clone(),
created: created,
data_source_id: data_source_id.to_string(),
database_id: database_id.to_string(),
Expand All @@ -45,15 +53,11 @@ impl Database {
}
}

pub async fn get_schema(
&self,
project: &Project,
store: Box<dyn Store + Sync + Send>,
) -> Result<DatabaseSchema> {
pub async fn get_schema(&self, store: Box<dyn Store + Sync + Send>) -> Result<DatabaseSchema> {
match self.db_type {
DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")),
DatabaseType::LOCAL => {
let rows = self.get_rows(project, store).await?;
let rows = self.get_rows(store).await?;

let schema = rows
.par_iter()
Expand All @@ -72,7 +76,6 @@ impl Database {

pub async fn create_in_memory_sqlite_conn(
&self,
project: &Project,
store: Box<dyn Store + Sync + Send>,
) -> Result<Connection> {
match self.db_type {
Expand All @@ -82,14 +85,14 @@ impl Database {
DatabaseType::LOCAL => {
let time_build_db_start = utils::now();

let schema = self.get_schema(project, store.clone()).await?;
let schema = self.get_schema(store.clone()).await?;
utils::done(&format!(
"DSSTRUCTSTAT Finished retrieving schema: duration={}ms",
utils::now() - time_build_db_start
));

let time_get_rows_start = utils::now();
let rows_by_table = match self.get_rows(project, store.clone()).await {
let rows_by_table = match self.get_rows(store.clone()).await {
Ok(rows) => Ok(rows
.into_iter()
.map(|(table, rows)| (table.table_id().to_string(), rows))
Expand Down Expand Up @@ -171,16 +174,13 @@ impl Database {

pub async fn query(
&self,
project: &Project,
store: Box<dyn Store + Sync + Send>,
query: &str,
) -> Result<(Vec<DatabaseRow>, TableSchema)> {
match self.db_type {
DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")),
DatabaseType::LOCAL => {
let conn = self
.create_in_memory_sqlite_conn(project, store.clone())
.await?;
let conn = self.create_in_memory_sqlite_conn(store.clone()).await?;

let time_query_start = utils::now();

Expand Down Expand Up @@ -270,11 +270,10 @@ impl Database {

pub async fn get_rows(
&self,
project: &Project,
store: Box<dyn Store + Sync + Send>,
) -> Result<Vec<(DatabaseTable, Vec<DatabaseRow>)>> {
let (tables, _) = store
.list_databases_tables(&project, &self.data_source_id, &self.database_id, None)
.list_databases_tables(&self.project, &self.data_source_id, &self.database_id, None)
.await?;

// Concurrently retrieve table rows.
Expand All @@ -287,7 +286,7 @@ impl Database {
async move {
let (rows, _) = store
.list_database_rows(
project,
&self.project,
self.data_source_id.as_str(),
self.database_id.as_str(),
table.table_id(),
Expand Down
125 changes: 25 additions & 100 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Store for PostgresStore {
let pool = self.pool.clone();
let c = pool.get().await?;

// Check that the dataset_id and hash exist
// Check that the dataset_id and hash exist.
let r = c
.query(
"SELECT id, created FROM datasets
Expand All @@ -218,7 +218,7 @@ impl Store for PostgresStore {
}
let (row_id, created) = d.unwrap();

// Retrieve data points through datasets_joins
// Retrieve data points through datasets_joins.
let stmt = c
.prepare(
"SELECT datasets_joins.point_idx, datasets_points.json \
Expand Down Expand Up @@ -784,7 +784,7 @@ impl Store for PostgresStore {

match block {
None => {
// Retrieve data points through datasets_joins
// Retrieve data points through datasets_joins.
let stmt = c
.prepare(
"SELECT \
Expand Down Expand Up @@ -835,7 +835,7 @@ impl Store for PostgresStore {
match block {
None => (),
Some((block_type, block_name)) => {
// Retrieve data points through datasets_joins for one block
// Retrieve data points through datasets_joins for one block.
let stmt = c
.prepare(
"SELECT \
Expand Down Expand Up @@ -1227,7 +1227,7 @@ impl Store for PostgresStore {

let tx = c.transaction().await?;

// get current tags and put them into a set
// Get current tags and put them into a set.
let current_tags_result = tx
.query(
"SELECT tags_array FROM data_sources_documents WHERE data_source = $1 \
Expand All @@ -1243,7 +1243,7 @@ impl Store for PostgresStore {
}
};

// update the set of tags based on the add and remove lists
// Update the set of tags based on the add and remove lists.
for tag in add_tags {
current_tags.insert(tag.clone());
}
Expand Down Expand Up @@ -1292,8 +1292,8 @@ impl Store for PostgresStore {
_ => unreachable!(),
};

// the `created` timestamp of the version specified by `latest_hash`
// (if `latest_hash` is `None`, then this is the latest version's `created` timestamp)
// The `created` timestamp of the version specified by `latest_hash`
// (if `latest_hash` is `None`, then this is the latest version's `created` timestamp).
let latest_hash_created: i64 = match latest_hash {
Some(latest_hash) => {
let stmt = c
Expand Down Expand Up @@ -1660,7 +1660,7 @@ impl Store for PostgresStore {
let chunk_count: i64 = r.get(9);

let tags = if remove_system_tags {
// remove tags that are prefixed with the system tag prefix
// Remove tags that are prefixed with the system tag prefix.
tags.into_iter()
.filter(|t| !t.starts_with(DATA_SOURCE_DOCUMENT_SYSTEM_TAG_PREFIX))
.collect()
Expand Down Expand Up @@ -1788,7 +1788,7 @@ impl Store for PostgresStore {
let pool = self.pool.clone();
let c = pool.get().await?;

// get the data source row id
// Get the data source row id.
let stmt = c
.prepare(
"SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1",
Expand Down Expand Up @@ -1820,7 +1820,13 @@ impl Store for PostgresStore {
)
.await?;

Ok(Database::new(db_created, &data_source_id, &db_id, &db_name))
Ok(Database::new(
&Project::new_from_id(project_id),
db_created,
&data_source_id,
&db_id,
&db_name,
))
}

async fn load_database(
Expand Down Expand Up @@ -1855,6 +1861,7 @@ impl Store for PostgresStore {
match d {
None => Ok(None),
Some((created, database_id, name, data_source_id)) => Ok(Some(Database::new(
&Project::new_from_id(project_id),
created as u64,
&data_source_id,
&database_id,
Expand Down Expand Up @@ -1908,6 +1915,7 @@ impl Store for PostgresStore {
let data_source_id: String = row.get(3);

Ok(Database::new(
&Project::new_from_id(project_id),
created as u64,
&data_source_id,
&database_id,
Expand Down Expand Up @@ -2148,89 +2156,6 @@ impl Store for PostgresStore {
Ok((tables, total))
}

async fn upsert_database_row(
&self,
project: &Project,
data_source_id: &str,
database_id: &str,
table_id: &str,
row_id: &str,
content: &Value,
) -> Result<DatabaseRow> {
let project_id = project.project_id();
let data_source_id = data_source_id.to_string();
let database_id = database_id.to_string();
let table_id = table_id.to_string();

let pool = self.pool.clone();
let c = pool.get().await?;

// get the data source row id
let stmt = c
.prepare(
"SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1",
)
.await?;

let r = c.query(&stmt, &[&project_id, &data_source_id]).await?;
let data_source_row_id: i64 = match r.len() {
0 => panic!("Unknown DataSource: {}", data_source_id),
1 => r[0].get(0),
_ => unreachable!(),
};

// get the database row id
let stmt = c
.prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1")
.await?;

let r = c.query(&stmt, &[&data_source_row_id, &database_id]).await?;

let database_row_id: i64 = match r.len() {
0 => panic!("Unknown Database: {}", database_id),
1 => r[0].get(0),
_ => unreachable!(),
};

// get the table row id
let stmt = c
.prepare(
"SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1",
)
.await?;
let r = c.query(&stmt, &[&database_row_id, &table_id]).await?;
let table_row_id: i64 = match r.len() {
0 => panic!("Unknown Table: {}", table_id),
1 => r[0].get(0),
_ => unreachable!(),
};

let row_created = utils::now();

let row_id = row_id.to_string();
let row_data = content.to_string();

// Upsert Database Row.
let stmt = c
.prepare(
"INSERT INTO databases_rows \
(id, database_table, created, row_id, content) \
VALUES (DEFAULT, $1, $2, $3, $4, $5) \
ON CONFLICT (row_id, database_table) DO UPDATE \
SET content = EXCLUDED.content \
RETURNING id",
)
.await?;

c.query_one(
&stmt,
&[&table_row_id, &(row_created as i64), &row_id, &row_data],
)
.await?;

Ok(DatabaseRow::new(row_created, Some(row_id), content))
}

async fn load_database_row(
&self,
project: &Project,
Expand Down Expand Up @@ -2406,7 +2331,7 @@ impl Store for PostgresStore {
let pool = self.pool.clone();
let mut c = pool.get().await?;

// get the data source row id
// Get the data source row id.
let stmt = c
.prepare(
"SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1",
Expand All @@ -2419,7 +2344,7 @@ impl Store for PostgresStore {
_ => unreachable!(),
};

// get the database row id
// Get the database row id.
let stmt = c
.prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1")
.await?;
Expand All @@ -2430,7 +2355,7 @@ impl Store for PostgresStore {
_ => unreachable!(),
};

// get the table row id
// Get the table row id.
let stmt = c
.prepare(
"SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1",
Expand All @@ -2443,18 +2368,18 @@ impl Store for PostgresStore {
_ => unreachable!(),
};

// start transaction
// Start transaction.
let c = c.transaction().await?;

// truncate table if required
// Truncate table if required.
if truncate {
let stmt = c
.prepare("DELETE FROM databases_rows WHERE database_table = $1")
.await?;
c.execute(&stmt, &[&table_row_id]).await?;
}

// prepare insertion/updation statement
// Prepare insertion/updation statement.
let stmt = c
.prepare(
"INSERT INTO databases_rows \
Expand Down
Loading
Loading