Skip to content

Commit

Permalink
feat: databases store + move SQLite stuff to worker (#2833)
Browse files Browse the repository at this point in the history
* move ownership of databases rows to sqlite workers

* optional limit/offset

* fix limit offset

* remove noisy tracing for worker heartbeat

* clear worker from DBs

* fix get rows

* move API calls to sqlite worker impl

* reimplement get rows through worker

* clone store arc

* fix cleanup code

* querying works 😌

* reimplement retrieve row

* file naming feedback

---------

Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Dec 13, 2023
1 parent 1b99ff6 commit 6c180c7
Show file tree
Hide file tree
Showing 11 changed files with 1,198 additions and 626 deletions.
127 changes: 91 additions & 36 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use dust::{
project::{self},
providers::provider::{provider, ProviderID},
run,
sqlite_workers::sqlite_workers,
sqlite_workers::client,
stores::postgres,
stores::store,
utils::{self, error_response, APIError, APIResponse},
Expand Down Expand Up @@ -142,7 +142,7 @@ impl APIState {
let store = self.store.clone();
tokio::task::spawn(async move {
match store
.sqlite_workers_cleanup(sqlite_workers::HEARTBEAT_INTERVAL_MS)
.sqlite_workers_cleanup(client::HEARTBEAT_INTERVAL_MS)
.await
{
Err(e) => {
Expand Down Expand Up @@ -2049,31 +2049,57 @@ async fn databases_rows_retrieve(
String,
String,
)>,

extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.load_database_row(&project, &data_source_id, &database_id, &table_id, &row_id)
.load_database(&project, &data_source_id, &database_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to upsert database rows",
"Failed to retrieve database",
Some(e),
),
Ok(row) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"row": row
})),
}),
),
Ok(db) => match db {
None => error_response(
StatusCode::NOT_FOUND,
"database_not_found",
&format!("No database found for id `{}`", database_id),
None,
),
Some(db) => match db.sqlite_worker(state.store.clone()).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
&format!("Failed to retrieve SQLite worker: {}", e),
Some(e),
),
Ok(worker) => match worker
.get_row(db.unique_id().as_str(), &table_id, &row_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
&format!("Failed to retrieve row: {}", e),
Some(e),
),
Ok(row) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"row": row,
})),
}),
),
},
},
},
}
}

Expand All @@ -2097,33 +2123,56 @@ async fn databases_rows_list(

match state
.store
.list_database_rows(
&project,
&data_source_id,
&database_id,
&table_id,
Some((query.limit, query.offset)),
)
.load_database(&project, &data_source_id, &database_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to list database rows",
"Failed to retrieve database",
Some(e),
),
Ok((rows, total)) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"rows": rows,
"offset": query.offset,
"limit": query.limit,
"total": total,
})),
}),
Ok(None) => error_response(
StatusCode::NOT_FOUND,
"database_not_found",
&format!("No database found for id `{}`", database_id),
None,
),
Ok(Some(db)) => match db.sqlite_worker(state.store.clone()).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
&format!("Failed to retrieve SQLite worker: {}", e),
Some(e),
),
Ok(worker) => match worker
.get_rows(
db.unique_id().as_str(),
&table_id,
Some((query.limit, query.offset)),
)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
&format!("Failed to list rows: {}", e),
Some(e),
),
Ok((rows, total)) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"offset": query.offset,
"limit": query.limit,
"total": total,
"rows": rows,
})),
}),
),
},
},
}
}

Expand Down Expand Up @@ -2179,7 +2228,7 @@ async fn databases_query_run(

// SQLite Workers

async fn sqlite_workers_hearbeat(
async fn sqlite_workers_heartbeat(
extract::Path(pod_name): extract::Path<String>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
Expand Down Expand Up @@ -2299,7 +2348,7 @@ fn main() {
};

let state = Arc::new(APIState::new(store, QdrantClients::build().await?));
let app = Router::new()
let router = Router::new()

// Index
.route("/", get(index))
Expand Down Expand Up @@ -2434,7 +2483,6 @@ fn main() {
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/query",
post(databases_query_run),
)
.route("/sqlite_workers/:pod_name", post(sqlite_workers_hearbeat))
.route("/sqlite_workers/:pod_name", delete(sqlite_workers_delete))
// Misc
.route("/tokenize", post(tokenize))
Expand All @@ -2448,6 +2496,13 @@ fn main() {
)
.layer(extract::Extension(state.clone()));

// In a separate router, to avoid noisy tracing.
let sqlite_heartbeat_router = Router::new()
.route("/sqlite_workers/:pod_name", post(sqlite_workers_heartbeat))
.layer(extract::Extension(state.clone()));

let app = Router::new().merge(router).merge(sqlite_heartbeat_router);

// Start the APIState run loop.
let runloop_state = state.clone();
tokio::task::spawn(async move { runloop_state.run_loop().await });
Expand Down
Loading

0 comments on commit 6c180c7

Please sign in to comment.