From 6ab5fa6dd552f15dce98387622c94e61c54368b5 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Tue, 29 Oct 2024 08:36:24 +0100 Subject: [PATCH] Batch delete and bump pool (#8285) * batch delete * bump pool to 128 :o --- core/src/databases_store/store.rs | 41 +++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/core/src/databases_store/store.rs b/core/src/databases_store/store.rs index b9b244d8642b..c29ddd59ade6 100644 --- a/core/src/databases_store/store.rs +++ b/core/src/databases_store/store.rs @@ -42,7 +42,7 @@ pub struct PostgresDatabasesStore { impl PostgresDatabasesStore { pub async fn new(db_uri: &str) -> Result { let manager = PostgresConnectionManager::new_from_stringlike(db_uri, NoTls)?; - let pool = Pool::builder().max_size(16).build(manager).await?; + let pool = Pool::builder().max_size(128).build(manager).await?; Ok(Self { pool }) } } @@ -152,15 +152,28 @@ impl DatabasesStore for PostgresDatabasesStore { let pool = self.pool.clone(); let c = pool.get().await?; - // Truncate table if required. + // Truncate table if required. Rows can be numerous so we delete rows in small batches to + // avoid long running operations. if truncate { + let deletion_batch_size: u64 = 512; + let stmt = c .prepare( - "DELETE FROM tables_rows - WHERE table_id = $1", + "DELETE FROM table_rows WHERE id IN ( + SELECT id FROM table_rows WHERE table_id = $1 LIMIT $2 + )", ) .await?; - c.execute(&stmt, &[&table_id]).await?; + + loop { + let deleted_rows = c + .execute(&stmt, &[&table_id, &(deletion_batch_size as i64)]) + .await?; + + if deleted_rows < deletion_batch_size { + break; + } + } } let stmt = c @@ -192,11 +205,25 @@ impl DatabasesStore for PostgresDatabasesStore { let pool = self.pool.clone(); let c = pool.get().await?; + let deletion_batch_size: u64 = 512; + let stmt = c - .prepare("DELETE FROM tables_rows WHERE table_id = $1") + .prepare( + "DELETE FROM table_rows WHERE id IN ( + SELECT id FROM table_rows WHERE table_id = $1 LIMIT $2 + )", + ) .await?; - c.execute(&stmt, &[&table_id]).await?; + loop { + let deleted_rows = c + .execute(&stmt, &[&table_id, &(deletion_batch_size as i64)]) + .await?; + + if deleted_rows < deletion_batch_size { + break; + } + } Ok(()) }