Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
nygrenh committed Oct 9, 2024
1 parent 3856f45 commit a93d729
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 116 deletions.
2 changes: 1 addition & 1 deletion services/headless-lms/models/src/chatbot_configurations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ AND deleted_at IS NULL
Ok(res)
}

pub async fn get_for_azure_search_maintananace(
pub async fn get_for_azure_search_maintenance(
conn: &mut PgConnection,
) -> ModelResult<Vec<ChatbotConfiguration>> {
let res = sqlx::query_as!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct ChatbotPageSyncStatus {
pub synced_page_revision_id: Option<Uuid>,
}

pub async fn make_sure_sync_statuses_exist(
pub async fn ensure_sync_statuses_exist(
conn: &mut PgConnection,
course_ids: &[Uuid],
) -> ModelResult<HashMap<Uuid, Vec<ChatbotPageSyncStatus>>> {
Expand Down
260 changes: 146 additions & 114 deletions services/headless-lms/server/src/programs/chatbot_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ use std::{
time::Duration,
};

use dotenv::dotenv;
use sqlx::{PgConnection, PgPool};
use url::Url;
use uuid::Uuid;

use crate::setup_tracing;

use dotenv::dotenv;
use headless_lms_chatbot::{
azure_blob_storage::AzureBlobClient,
azure_search_index::{create_search_index, does_search_index_exist},
Expand All @@ -19,17 +23,53 @@ use headless_lms_utils::{
document_schema_processor::{remove_sensitive_attributes, GutenbergBlock},
ApplicationConfiguration,
};
use sqlx::{PgConnection, PgPool};
use std::path::PathBuf;
use url::Url;
use uuid::Uuid;

const SYNC_INTERVAL_SECS: u64 = 10;
const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;

pub async fn main() -> anyhow::Result<()> {
initialize_environment()?;
let config = initialize_configuration().await?;
let db_pool = initialize_database_pool(&config.database_url).await?;
let mut conn = db_pool.acquire().await?;
let blob_client = initialize_blob_client(&config).await?;

let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
let mut ticks = 0;

info!("Starting chatbot syncer.");

loop {
interval.tick().await;
ticks += 1;

if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
ticks = 0;
info!("Still syncing for chatbot.");
}
if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
error!("Error during synchronization: {:?}", e);
}
}
}

fn initialize_environment() -> anyhow::Result<()> {
env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn");
dotenv().ok();
setup_tracing()?;
Ok(())
}

struct SyncerConfig {
database_url: String,
name_prefix: String,
app_configuration: ApplicationConfiguration,
}

async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
let database_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());

let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
.expect("BASE_URL must be a valid URL");

Expand All @@ -38,106 +78,101 @@ pub async fn main() -> anyhow::Result<()> {
.expect("BASE_URL must have a host")
.replace(".", "-");

let app_config = ApplicationConfiguration::try_from_env()?;
let blob_storage_client = AzureBlobClient::new(&app_config, &name_prefix).await?;
blob_storage_client.ensure_container_exists().await?;
let app_configuration = ApplicationConfiguration::try_from_env()?;

let db_pool = PgPool::connect(&database_url).await?;
let mut conn = db_pool.acquire().await?;
let mut interval = tokio::time::interval(Duration::from_secs(10));
let mut ticks = 60;
Ok(SyncerConfig {
database_url,
name_prefix,
app_configuration,
})
}

loop {
interval.tick().await;
ticks += 1;
// 60 10 second intervals = 10 minutes
if ticks > 60 {
// Occasionally prints a reminder that the service is still running
ticks = 0;
tracing::info!("Syncing pages to chatbot backend.");
sync_pages(&mut conn, &name_prefix, &app_config, &blob_storage_client).await?;
}
}
/// Initializes the PostgreSQL connection pool.
async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
PgPool::connect(database_url).await.map_err(|e| {
anyhow::anyhow!(
"Failed to connect to the database at {}: {:?}",
database_url,
e
)
})
}

/// Initializes the Azure Blob Storage client.
async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name_prefix).await?;
blob_client.ensure_container_exists().await?;
Ok(blob_client)
}

/// Continuously syncs page contents to the chatbot backend.
/// Synchronizes pages to the chatbot backend.
async fn sync_pages(
conn: &mut sqlx::PgConnection,
index_name_prefix: &str,
app_config: &ApplicationConfiguration,
blob_storage_client: &AzureBlobClient,
conn: &mut PgConnection,
config: &SyncerConfig,
blob_client: &AzureBlobClient,
) -> anyhow::Result<()> {
let chatbot_configurations =
headless_lms_models::chatbot_configurations::get_for_azure_search_maintananace(conn)
.await?;
let course_ids = chatbot_configurations
let chatbot_configs =
headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;

let course_ids: Vec<Uuid> = chatbot_configs
.iter()
.map(|config| config.course_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let sync_statuses_by_course_id =
headless_lms_models::chatbot_page_sync_statuses::make_sure_sync_statuses_exist(
.collect();

let sync_statuses =
headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
conn,
&course_ids,
)
.await?;
let latest_history_entries_by_page_id =

let latest_histories =
headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
conn,
&course_ids,
)
.await?;
for (course_id, statuses) in sync_statuses_by_course_id.iter() {
let statuses_not_up_to_date = statuses

for (course_id, statuses) in sync_statuses.iter() {
let outdated_statuses: Vec<_> = statuses
.iter()
.filter(|status| {
if let Some(history_entry) = latest_history_entries_by_page_id.get(&status.page_id)
{
status.synced_page_revision_id != Some(history_entry.id)
} else {
warn!(
"No history entry found for page with id {}. Skipping syncing. ",
status.page_id
);
false
}
latest_histories
.get(&status.page_id)
.map_or(false, |history| {
status.synced_page_revision_id != Some(history.id)
})
})
.collect::<Vec<_>>();
if statuses_not_up_to_date.is_empty() {
.collect();

if outdated_statuses.is_empty() {
continue;
}

info!(
"Syncing {} pages for course with id {}.",
statuses_not_up_to_date.len(),
"Syncing {} pages for course ID: {}.",
outdated_statuses.len(),
course_id
);
let index_name = format!("{}-{}", index_name_prefix, course_id);
ensure_index_exists(&index_name, app_config).await?;
let page_ids = statuses_not_up_to_date
.iter()
.map(|status| status.page_id)
.collect::<Vec<_>>();

let index_name = format!("{}-{}", config.name_prefix, course_id);
ensure_search_index_exists(&index_name, &config.app_configuration).await?;

let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
let pages = headless_lms_models::pages::get_by_ids(conn, &page_ids).await?;

sync_pages_batch(
conn,
&index_name,
&pages,
&latest_history_entries_by_page_id,
app_config,
blob_storage_client,
*course_id,
)
.await?;
sync_pages_batch(conn, &pages, &latest_histories, blob_client).await?;

// Delete old files only when we have synced something new (thanks to the continue above). This way we don't constantly hammer the api when nothing has changed.
delete_old_files(&mut *conn, *course_id, blob_storage_client).await?;
delete_old_files(conn, *course_id, blob_client).await?;
}

Ok(())
}

async fn ensure_index_exists(
/// Ensures that the specified search index exists, creating it if necessary.
async fn ensure_search_index_exists(
index_name: &str,
app_config: &ApplicationConfiguration,
) -> anyhow::Result<()> {
Expand All @@ -147,88 +182,85 @@ async fn ensure_index_exists(
Ok(())
}

/// Processes and synchronizes a batch of pages.
async fn sync_pages_batch(
conn: &mut PgConnection,
index_name: &str,
pages: &[Page],
latest_history_entries_by_page_id: &HashMap<Uuid, PageHistory>,
app_config: &ApplicationConfiguration,
blob_storage_client: &AzureBlobClient,
course_id: Uuid,
latest_histories: &HashMap<Uuid, PageHistory>,
blob_client: &AzureBlobClient,
) -> anyhow::Result<()> {
let mut allowed_file_paths = Vec::new();

for page in pages {
info!("Syncing page with id {}.", page.id);
info!("Syncing page ID: {}.", page.id);

let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
let content = remove_sensitive_attributes(parsed_content);
let content_string = serde_json::to_string(&content)?;
let content_bytes = content_string.as_bytes();
let blob_path = page_to_blob_path(&page)?;
allowed_file_paths.push(blob_path.clone());
let sanitized_content = remove_sensitive_attributes(parsed_content);
let content_string = serde_json::to_string(&sanitized_content)?;
let blob_path = generate_blob_path(page)?;

blob_storage_client
.upload_file(&blob_path, content_bytes)
allowed_file_paths.push(blob_path.clone());
blob_client
.upload_file(&blob_path, content_string.as_bytes())
.await?;
}

// Mark the documents we just uploaded as synced
let page_id_to_latest_history_id = pages
let page_revision_map: HashMap<Uuid, Uuid> = pages
.iter()
.map(|page| (page.id, latest_history_entries_by_page_id[&page.id].id))
.collect::<HashMap<_, _>>();
.map(|page| (page.id, latest_histories[&page.id].id))
.collect();

headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
conn,
page_id_to_latest_history_id,
page_revision_map,
)
.await?;

Ok(())
}

fn page_to_blob_path(page: &Page) -> anyhow::Result<String> {
let base_path = if let Some(course_id) = page.course_id {
PathBuf::from(course_id.to_string())
} else {
return Err(anyhow::anyhow!(
"Trying to sync a page that does not belong to a course."
));
};
let mut url_path = page.url_path.clone();
if url_path.starts_with('/') {
url_path = url_path[1..].to_string();
}
/// Generates the blob storage path for a given page.
fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
let course_id = page
.course_id
.ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;

let mut url_path = page.url_path.trim_start_matches('/').to_string();
if url_path.is_empty() {
url_path = "index".to_string();
}
Ok(format!("{}/{}.json", base_path.to_string_lossy(), url_path))

Ok(format!("{}/{}.json", course_id, url_path))
}

/** Deletes all files that don't belong to a page. */
/// Deletes files from blob storage that are no longer associated with any page.
async fn delete_old_files(
conn: &mut PgConnection,
course_id: Uuid,
blob_storage_client: &AzureBlobClient,
blob_client: &AzureBlobClient,
) -> anyhow::Result<()> {
let files = blob_storage_client
let existing_files = blob_client
.list_files_with_prefix(&course_id.to_string())
.await?;
let all_pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
&mut *conn,

let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
conn,
course_id,
PageVisibility::Public,
)
.await?;
let allowed_file_paths = all_pages

let allowed_paths: HashSet<String> = pages
.iter()
.map(|page| page_to_blob_path(page))
.collect::<Result<Vec<_>, _>>()?;
for file in files {
if !allowed_file_paths.contains(&file) {
info!("Deleting file: {}", file);
blob_storage_client.delete_file(&file).await?;
.filter_map(|page| generate_blob_path(page).ok())
.collect();

for file in existing_files {
if !allowed_paths.contains(&file) {
info!("Deleting obsolete file: {}", file);
blob_client.delete_file(&file).await?;
}
}

Ok(())
}

0 comments on commit a93d729

Please sign in to comment.