Skip to content

Commit

Permalink
[Keyword search] Backfill nodes index with PSQL nodes table (#9582)
Browse files Browse the repository at this point in the history
* [Keyword search] Backfill nodes index with PSQL nodes table

Description
---
Fixes #8765

Risks
---
We use the read replica so all should be fine
Batch upserts should be quite fast too

Deploy
---
core
run backfill

* remove region

* fix internal id

* fix row cursor

* clean and add args
  • Loading branch information
philipperolet authored Dec 22, 2024
1 parent d984b86 commit 0ab77ac
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 17 deletions.
4 changes: 4 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ path = "bin/init_db.rs"
name = "elasticsearch_create_index"
path = "bin/elasticsearch/create_index.rs"

[[bin]]
name = "elasticsearch_backfill_index"
path = "bin/elasticsearch/backfill_index.rs"

[[bin]]
name = "qdrant_create_collection"
path = "bin/qdrant/create_collection.rs"
Expand Down
137 changes: 137 additions & 0 deletions core/bin/elasticsearch/backfill_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use clap::Parser;
use dust::{
data_sources::node::Node,
search_stores::search_store::ElasticsearchSearchStore,
stores::{postgres::PostgresStore, store::Store},
utils::{self},
};
use elasticsearch::{http::request::JsonBody, indices::IndicesExistsParts, BulkParts};
use http::StatusCode;
use serde_json::json;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(long, help = "The version of the index")]
index_version: u32,

#[arg(long, help = "Skip confirmation")]
skip_confirmation: bool,

#[arg(long, help = "The cursor to start from", default_value = "0")]
start_cursor: i64,

#[arg(long, help = "The batch size", default_value = "100")]
batch_size: i64,
}

/*
* Backfills nodes index in Elasticsearch for core using the postgres table `data_sources_nodes`
*
* Usage:
* cargo run --bin elasticsearch_backfill_nodes_index -- --index-version <version> [--skip-confirmation] [--start-cursor <cursor>] [--batch-size <batch_size>]
*
*/
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// parse args and env vars
let args = Args::parse();
let index_name = "data_sources_nodes";
let index_version = args.index_version;
let batch_size = args.batch_size;
let start_cursor = args.start_cursor;

let url = std::env::var("ELASTICSEARCH_URL").expect("ELASTICSEARCH_URL must be set");
let username =
std::env::var("ELASTICSEARCH_USERNAME").expect("ELASTICSEARCH_USERNAME must be set");
let password =
std::env::var("ELASTICSEARCH_PASSWORD").expect("ELASTICSEARCH_PASSWORD must be set");

let region = std::env::var("DUST_REGION").expect("DUST_REGION must be set");

// create ES client
let search_store = ElasticsearchSearchStore::new(&url, &username, &password).await?;

let index_fullname = format!("core.{}_{}", index_name, index_version);

// err if index does not exist
let response = search_store
.client
.indices()
.exists(IndicesExistsParts::Index(&[index_fullname.as_str()]))
.send()
.await?;

if response.status_code() != StatusCode::OK {
return Err(anyhow::anyhow!("Index does not exist").into());
}

if !args.skip_confirmation {
println!(
"Are you sure you want to backfill the index {} in region {}? (y/N)",
index_fullname, region
);
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
if input.trim() != "y" {
return Err(anyhow::anyhow!("Aborted").into());
}
}

let db_uri = std::env::var("CORE_DATABASE_READ_REPLICA_URI")
.expect("CORE_DATABASE_READ_REPLICA_URI must be set");
let store = PostgresStore::new(&db_uri).await?;
// loop on all nodes in postgres using id as cursor, stopping when timestamp
// is greater than now
let mut next_cursor = start_cursor;
let now = utils::now();
loop {
println!(
"Processing {} nodes, starting at id {}",
batch_size, next_cursor
);
let (nodes, cursor) =
get_node_batch(next_cursor, batch_size, Box::new(store.clone())).await?;
if nodes.is_empty() || nodes.first().unwrap().timestamp > now {
break;
}
next_cursor = cursor;

//
let nodes_body: Vec<JsonBody<_>> = nodes
.into_iter()
.flat_map(|node| {
[
json!({"index": {"_id": node.unique_id()}}).into(),
json!(node).into(),
]
})
.collect();
search_store
.client
.bulk(BulkParts::Index(index_fullname.as_str()))
.body(nodes_body)
.send()
.await?;
}

Ok(())
}

async fn get_node_batch(
next_cursor: i64,
batch_size: i64,
store: Box<dyn Store + Sync + Send>,
) -> Result<(Vec<Node>, i64), Box<dyn std::error::Error>> {
let nodes = store
.list_data_source_nodes(next_cursor, batch_size)
.await?;
let last_node = nodes.last().cloned();
match last_node {
Some((_, last_row_id, _)) => Ok((
nodes.into_iter().map(|(node, _, _)| node).collect(),
last_row_id,
)),
None => Ok((vec![], 0)),
}
}
18 changes: 1 addition & 17 deletions core/bin/elasticsearch/create_index.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,10 @@
use std::collections::HashMap;

use clap::{Parser, ValueEnum};
use clap::Parser;
use dust::search_stores::search_store::ElasticsearchSearchStore;
use elasticsearch::indices::{IndicesCreateParts, IndicesDeleteAliasParts, IndicesExistsParts};
use http::StatusCode;

#[derive(Parser, Debug, Clone, ValueEnum)]
enum Region {
Local,
#[clap(name = "us-central-1")]
UsCentral1,
}

impl std::fmt::Display for Region {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Region::Local => write!(f, "local"),
Region::UsCentral1 => write!(f, "us-central-1"),
}
}
}

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
Expand Down
58 changes: 58 additions & 0 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3525,6 +3525,64 @@ impl Store for PostgresStore {
}
}

async fn list_data_source_nodes(
&self,
id_cursor: i64,
batch_size: i64,
) -> Result<Vec<(Node, i64, i64)>> {
let pool = self.pool.clone();
let c = pool.get().await?;

let stmt = c
.prepare(
"SELECT dsn.timestamp, dsn.title, dsn.mime_type, dsn.parents, dsn.node_id, dsn.document, dsn.\"table\", dsn.folder, ds.data_source_id, ds.internal_id, dsn.id \
FROM data_sources_nodes dsn JOIN data_sources ds ON dsn.data_source = ds.id \
WHERE dsn.id > $1 ORDER BY dsn.id ASC LIMIT $2",
)
.await?;
let rows = c.query(&stmt, &[&id_cursor, &batch_size]).await?;

let nodes: Vec<(Node, i64, i64)> = rows
.iter()
.map(|row| {
let timestamp: i64 = row.get::<_, i64>(0);
let title: String = row.get::<_, String>(1);
let mime_type: String = row.get::<_, String>(2);
let parents: Vec<String> = row.get::<_, Vec<String>>(3);
let node_id: String = row.get::<_, String>(4);
let document_row_id = row.get::<_, Option<i64>>(5);
let table_row_id = row.get::<_, Option<i64>>(6);
let folder_row_id = row.get::<_, Option<i64>>(7);
let data_source_id: String = row.get::<_, String>(8);
let data_source_internal_id: String = row.get::<_, String>(9);
let (node_type, element_row_id) =
match (document_row_id, table_row_id, folder_row_id) {
(Some(id), None, None) => (NodeType::Document, id),
(None, Some(id), None) => (NodeType::Table, id),
(None, None, Some(id)) => (NodeType::Folder, id),
_ => unreachable!(),
};
let row_id = row.get::<_, i64>(10);
(
Node::new(
&data_source_id,
&data_source_internal_id,
&node_id,
node_type,
timestamp as u64,
&title,
&mime_type,
parents.get(1).cloned(),
parents,
),
row_id,
element_row_id,
)
})
.collect::<Vec<_>>();
Ok(nodes)
}

async fn llm_cache_get(
&self,
project: &Project,
Expand Down
7 changes: 7 additions & 0 deletions core/src/stores/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ pub trait Store {
data_source_id: &str,
node_id: &str,
) -> Result<Option<(Node, i64)>>;
// returns a list of (node, row_id, element_row_id)
async fn list_data_source_nodes(
&self,
id_cursor: i64,
batch_size: i64,
) -> Result<Vec<(Node, i64, i64)>>;

// LLM Cache
async fn llm_cache_get(
&self,
Expand Down

0 comments on commit 0ab77ac

Please sign in to comment.