From bbb5bdaaa9b9df97261e63b94bee5e51cdc30868 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 2 Jan 2025 15:33:44 +0800 Subject: [PATCH 01/12] feat: txn for pg kv backend --- src/common/meta/src/error.rs | 17 ++- src/common/meta/src/kv_backend/postgres.rs | 122 +++++++++++++++++++-- src/common/meta/src/kv_backend/txn.rs | 6 +- src/meta-srv/src/bootstrap.rs | 2 +- 4 files changed, 128 insertions(+), 19 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 82b591d139a6..8c92146a4624 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -683,6 +683,16 @@ pub enum Error { location: Location, }, + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to {} Postgres transaction", operation))] + PostgresTransaction { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + operation: String, + }, + #[snafu(display( "Datanode table info not found, table id: {}, datanode id: {}", table_id, @@ -794,9 +804,10 @@ impl ErrorExt for Error { | EmptyDdlTasks { .. } => StatusCode::InvalidArguments, #[cfg(feature = "pg_kvbackend")] - PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => { - StatusCode::Internal - } + PostgresExecution { .. } + | CreatePostgresPool { .. } + | GetPostgresConnection { .. } + | PostgresTransaction { .. } => StatusCode::Internal, Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal, } } diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index f2416671e229..f127e40f5307 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -22,11 +22,12 @@ use tokio_postgres::types::ToSql; use tokio_postgres::NoTls; use crate::error::{ - CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, Result, - StrFromUtf8Snafu, + CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, + PostgresTransactionSnafu, Result, StrFromUtf8Snafu, }; -use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse}; +use crate::kv_backend::txn::{Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse}; use crate::kv_backend::{KvBackend, KvBackendRef, TxnService}; +use crate::metrics::METRIC_META_TXN_REQUEST; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, @@ -37,6 +38,7 @@ use crate::rpc::KeyValue; /// Posgres backend store for metasrv pub struct PgStore { pool: Pool, + max_txn_ops: usize, } const EMPTY: &[u8] = &[0]; @@ -94,17 +96,17 @@ SELECT k, v FROM prev;"#; impl PgStore { /// Create pgstore impl of KvBackendRef from url. - pub async fn with_url(url: &str) -> Result { + pub async fn with_url(url: &str, max_txn_ops: usize) -> Result { let mut cfg = Config::new(); cfg.url = Some(url.to_string()); let pool = cfg .create_pool(Some(Runtime::Tokio1), NoTls) .context(CreatePostgresPoolSnafu)?; - Self::with_pg_pool(pool).await + Self::with_pg_pool(pool, max_txn_ops).await } /// Create pgstore impl of KvBackendRef from tokio-postgres client. - pub async fn with_pg_pool(pool: Pool) -> Result { + pub async fn with_pg_pool(pool: Pool, max_txn_ops: usize) -> Result { // This step ensures the postgres metadata backend is ready to use. // We check if greptime_metakv table exists, and we will create a new table // if it does not exist. @@ -121,7 +123,7 @@ impl PgStore { .execute(METADKV_CREATION, &[]) .await .context(PostgresExecutionSnafu)?; - Ok(Arc::new(Self { pool })) + Ok(Arc::new(Self { pool, max_txn_ops })) } async fn get_client(&self) -> Result> { @@ -544,17 +546,110 @@ impl KvBackend for PgStore { } } +impl PgStore { + async fn point_get(&self, key: &Vec) -> Result>> { + let key = process_bytes(key, "pointGetKey")?; + let res = self + .get_client() + .await? + .query(POINT_GET, &[&key]) + .await + .context(PostgresExecutionSnafu)?; + match res.is_empty() { + true => Ok(None), + false => Ok(Some(res[0].get::<_, Vec>(1))), + } + } + + async fn execute_txn_op(&self, op: TxnOp) -> Result { + match op { + TxnOp::Put(key, value) => { + let res = self + .put(PutRequest { + key: key, + value: value, + prev_kv: false, + }) + .await?; + Ok(TxnOpResponse::ResponsePut(res)) + } + TxnOp::Get(key) => { + let res = self + .range(RangeRequest { + key: key, + range_end: vec![], + limit: 1, + keys_only: false, + }) + .await?; + Ok(TxnOpResponse::ResponseGet(res)) + } + TxnOp::Delete(key) => { + let res = self + .delete_range(DeleteRangeRequest { + key: key, + range_end: vec![], + prev_kv: false, + }) + .await?; + Ok(TxnOpResponse::ResponseDelete(res)) + } + } + } +} + #[async_trait::async_trait] impl TxnService for PgStore { type Error = Error; - async fn txn(&self, _txn: KvTxn) -> Result { - // TODO: implement txn for pg kv backend. - unimplemented!() + async fn txn(&self, txn: KvTxn) -> Result { + let _timer = METRIC_META_TXN_REQUEST + .with_label_values(&["etcd", "txn"]) + .start_timer(); + + let mut client = self.get_client().await?; + let pg_txn = client + .transaction() + .await + .context(PostgresTransactionSnafu { + operation: "start".to_string(), + })?; + let mut success = true; + if txn.c_when { + for cmp in txn.req.compare { + let value = self.point_get(&cmp.key).await?; + if cmp.compare_value(value.as_ref()) { + success = true; + } else { + success = false; + break; + } + } + } + let mut responses = vec![]; + if success && txn.c_then { + for txnop in txn.req.success { + let res = self.execute_txn_op(txnop).await?; + responses.push(res); + } + } else if !success && txn.c_else { + for txnop in txn.req.failure { + let res = self.execute_txn_op(txnop).await?; + responses.push(res); + } + } + + pg_txn.commit().await.context(PostgresTransactionSnafu { + operation: "commit".to_string(), + })?; + Ok(KvTxnResponse { + responses, + succeeded: success, + }) } fn max_txn_ops(&self) -> usize { - unreachable!("postgres backend does not support max_txn_ops!") + self.max_txn_ops } } @@ -598,7 +693,10 @@ mod tests { .await .context(PostgresExecutionSnafu) .unwrap(); - Some(PgStore { pool }) + Some(PgStore { + pool, + max_txn_ops: 128, + }) } #[tokio::test] diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index ea3e95aa3ca6..19d9479368ca 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -131,9 +131,9 @@ pub struct TxnResponse { pub struct Txn { // HACK - chroot would modify this field pub(super) req: TxnRequest, - c_when: bool, - c_then: bool, - c_else: bool, + pub(super) c_when: bool, + pub(super) c_then: bool, + pub(super) c_else: bool, } #[cfg(any(test, feature = "testing"))] diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 11b29f288506..91a58e7d5be7 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -229,7 +229,7 @@ pub async fn metasrv_builder( #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pool = create_postgres_pool(opts).await?; - let kv_backend = PgStore::with_pg_pool(pool) + let kv_backend = PgStore::with_pg_pool(pool, opts.max_txn_ops) .await .context(error::KvBackendSnafu)?; // Client for election should be created separately since we need a different session keep-alive idle time. From 646d20b9e1935e46df13162d1625db1c07878b88 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 2 Jan 2025 15:43:20 +0800 Subject: [PATCH 02/12] chore: clippy --- src/common/meta/src/kv_backend/postgres.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index f127e40f5307..1cd7f8cb3b30 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -547,7 +547,7 @@ impl KvBackend for PgStore { } impl PgStore { - async fn point_get(&self, key: &Vec) -> Result>> { + async fn point_get(&self, key: &[u8]) -> Result>> { let key = process_bytes(key, "pointGetKey")?; let res = self .get_client() @@ -566,8 +566,8 @@ impl PgStore { TxnOp::Put(key, value) => { let res = self .put(PutRequest { - key: key, - value: value, + key, + value, prev_kv: false, }) .await?; @@ -576,7 +576,7 @@ impl PgStore { TxnOp::Get(key) => { let res = self .range(RangeRequest { - key: key, + key, range_end: vec![], limit: 1, keys_only: false, @@ -587,7 +587,7 @@ impl PgStore { TxnOp::Delete(key) => { let res = self .delete_range(DeleteRangeRequest { - key: key, + key, range_end: vec![], prev_kv: false, }) From 40381a16e297907e82914cbe8b3fee3a6d734034 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 2 Jan 2025 16:32:06 +0800 Subject: [PATCH 03/12] fix: txn uses one client --- src/common/meta/src/kv_backend/postgres.rs | 293 ++++++++++++++------- 1 file changed, 195 insertions(+), 98 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 1cd7f8cb3b30..30ddd93a3582 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -35,6 +35,43 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +type PgClient = deadpool::managed::Object; + +enum PgQueryExecutor<'a> { + Client(PgClient), + Transaction(deadpool_postgres::Transaction<'a>), +} + +impl PgQueryExecutor<'_> { + async fn query( + &self, + query: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result> { + match self { + PgQueryExecutor::Client(client) => client + .query(query, params) + .await + .context(PostgresExecutionSnafu), + PgQueryExecutor::Transaction(txn) => txn + .query(query, params) + .await + .context(PostgresExecutionSnafu), + } + } + + async fn commit(self) -> Result<()> { + match self { + PgQueryExecutor::Client(_) => Ok(()), + PgQueryExecutor::Transaction(txn) => { + txn.commit().await.context(PostgresTransactionSnafu { + operation: "commit".to_string(), + }) + } + } + } +} + /// Posgres backend store for metasrv pub struct PgStore { pool: Pool, @@ -126,7 +163,7 @@ impl PgStore { Ok(Arc::new(Self { pool, max_txn_ops })) } - async fn get_client(&self) -> Result> { + async fn get_client(&self) -> Result { match self.pool.get().await { Ok(client) => Ok(client), Err(e) => GetPostgresConnectionSnafu { @@ -136,13 +173,15 @@ impl PgStore { } } - async fn put_if_not_exists(&self, key: &str, value: &str) -> Result { - let res = self - .get_client() - .await? + async fn put_if_not_exists_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + key: &str, + value: &str, + ) -> Result { + let res = query_executor .query(PUT_IF_NOT_EXISTS, &[&key, &value]) - .await - .context(PostgresExecutionSnafu)?; + .await?; Ok(res.is_empty()) } } @@ -249,6 +288,68 @@ impl KvBackend for PgStore { } async fn range(&self, req: RangeRequest) -> Result { + let client = self.get_client().await?; + self.range_with_query_executor(&PgQueryExecutor::Client(client), req) + .await + } + + async fn put(&self, req: PutRequest) -> Result { + let client = self.get_client().await?; + self.put_with_query_executor(&PgQueryExecutor::Client(client), req) + .await + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + let client = self.get_client().await?; + self.batch_put_with_query_executor(&PgQueryExecutor::Client(client), req) + .await + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let client = self.get_client().await?; + self.batch_get_with_query_executor(&PgQueryExecutor::Client(client), req) + .await + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let client = self.get_client().await?; + self.delete_range_with_query_executor(&PgQueryExecutor::Client(client), req) + .await + } + + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + let client = self.get_client().await?; + self.batch_delete_with_query_executor(&PgQueryExecutor::Client(client), req) + .await + } + + async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + let client = self.get_client().await?; + self.compare_and_put_with_query_executor(&PgQueryExecutor::Client(client), req) + .await + } +} + +impl PgStore { + /// Point get with certain client. It's needed for a client with transaction. + async fn point_get_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + key: &[u8], + ) -> Result>> { + let key = process_bytes(key, "pointGetKey")?; + let res = query_executor.query(POINT_GET, &[&key]).await?; + match res.is_empty() { + true => Ok(None), + false => Ok(Some(res[0].get::<_, Vec>(1))), + } + } + + async fn range_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + req: RangeRequest, + ) -> Result { let mut params = vec![]; let template = select_range_template(&req); if req.key != EMPTY { @@ -277,12 +378,7 @@ impl KvBackend for PgStore { Cow::Owned(owned) => owned as &(dyn ToSql + Sync), }) .collect(); - let res = self - .get_client() - .await? - .query(&template, ¶ms) - .await - .context(PostgresExecutionSnafu)?; + let res = query_executor.query(&template, ¶ms).await?; let kvs: Vec = res .into_iter() .map(|r| { @@ -310,16 +406,23 @@ impl KvBackend for PgStore { }) } - async fn put(&self, req: PutRequest) -> Result { + async fn put_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + req: PutRequest, + ) -> Result { let kv = KeyValue { key: req.key, value: req.value, }; let mut res = self - .batch_put(BatchPutRequest { - kvs: vec![kv], - prev_kv: req.prev_kv, - }) + .batch_put_with_query_executor( + query_executor, + BatchPutRequest { + kvs: vec![kv], + prev_kv: req.prev_kv, + }, + ) .await?; if !res.prev_kvs.is_empty() { @@ -330,7 +433,11 @@ impl KvBackend for PgStore { Ok(PutResponse { prev_kv: None }) } - async fn batch_put(&self, req: BatchPutRequest) -> Result { + async fn batch_put_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + req: BatchPutRequest, + ) -> Result { let mut in_params = Vec::with_capacity(req.kvs.len()); let mut values_params = Vec::with_capacity(req.kvs.len() * 2); @@ -348,12 +455,7 @@ impl KvBackend for PgStore { let query = generate_batch_upsert_query(req.kvs.len()); - let res = self - .get_client() - .await? - .query(&query, ¶ms) - .await - .context(PostgresExecutionSnafu)?; + let res = query_executor.query(&query, ¶ms).await?; if req.prev_kv { let kvs: Vec = res .into_iter() @@ -373,7 +475,12 @@ impl KvBackend for PgStore { Ok(BatchPutResponse { prev_kvs: vec![] }) } - async fn batch_get(&self, req: BatchGetRequest) -> Result { + /// Batch get with certain client. It's needed for a client with transaction. + async fn batch_get_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + req: BatchGetRequest, + ) -> Result { if req.keys.is_empty() { return Ok(BatchGetResponse { kvs: vec![] }); } @@ -388,12 +495,7 @@ impl KvBackend for PgStore { .map(|x| x as &(dyn ToSql + Sync)) .collect(); - let res = self - .get_client() - .await? - .query(&query, ¶ms) - .await - .context(PostgresExecutionSnafu)?; + let res = query_executor.query(&query, ¶ms).await?; let kvs: Vec = res .into_iter() .map(|r| { @@ -408,7 +510,11 @@ impl KvBackend for PgStore { Ok(BatchGetResponse { kvs }) } - async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + async fn delete_range_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + req: DeleteRangeRequest, + ) -> Result { let mut params = vec![]; let template = select_range_delete_template(&req); if req.key != EMPTY { @@ -432,12 +538,7 @@ impl KvBackend for PgStore { }) .collect(); - let res = self - .get_client() - .await? - .query(template, ¶ms) - .await - .context(PostgresExecutionSnafu)?; + let res = query_executor.query(template, ¶ms).await?; let deleted = res.len() as i64; if !req.prev_kv { return Ok({ @@ -464,7 +565,11 @@ impl KvBackend for PgStore { }) } - async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + async fn batch_delete_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + req: BatchDeleteRequest, + ) -> Result { if req.keys.is_empty() { return Ok(BatchDeleteResponse { prev_kvs: vec![] }); } @@ -479,12 +584,7 @@ impl KvBackend for PgStore { .map(|x| x as &(dyn ToSql + Sync)) .collect(); - let res = self - .get_client() - .await? - .query(&query, ¶ms) - .await - .context(PostgresExecutionSnafu)?; + let res = query_executor.query(&query, ¶ms).await?; if !req.prev_kv { return Ok(BatchDeleteResponse { prev_kvs: vec![] }); } @@ -502,11 +602,17 @@ impl KvBackend for PgStore { Ok(BatchDeleteResponse { prev_kvs: kvs }) } - async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + async fn compare_and_put_with_query_executor( + &self, + query_executor: &PgQueryExecutor<'_>, + req: CompareAndPutRequest, + ) -> Result { let key = process_bytes(&req.key, "CASKey")?; let value = process_bytes(&req.value, "CASValue")?; if req.expect.is_empty() { - let put_res = self.put_if_not_exists(key, value).await?; + let put_res = self + .put_if_not_exists_with_query_executor(query_executor, key, value) + .await?; return Ok(CompareAndPutResponse { success: put_res, prev_kv: None, @@ -514,12 +620,7 @@ impl KvBackend for PgStore { } let expect = process_bytes(&req.expect, "CASExpect")?; - let res = self - .get_client() - .await? - .query(CAS, &[&key, &value, &expect]) - .await - .context(PostgresExecutionSnafu)?; + let res = query_executor.query(CAS, &[&key, &value, &expect]).await?; match res.is_empty() { true => Ok(CompareAndPutResponse { success: false, @@ -544,53 +645,50 @@ impl KvBackend for PgStore { } } } -} -impl PgStore { - async fn point_get(&self, key: &[u8]) -> Result>> { - let key = process_bytes(key, "pointGetKey")?; - let res = self - .get_client() - .await? - .query(POINT_GET, &[&key]) - .await - .context(PostgresExecutionSnafu)?; - match res.is_empty() { - true => Ok(None), - false => Ok(Some(res[0].get::<_, Vec>(1))), - } - } - - async fn execute_txn_op(&self, op: TxnOp) -> Result { + async fn execute_txn_op( + &self, + query_executor: &PgQueryExecutor<'_>, + op: TxnOp, + ) -> Result { match op { TxnOp::Put(key, value) => { let res = self - .put(PutRequest { - key, - value, - prev_kv: false, - }) + .put_with_query_executor( + query_executor, + PutRequest { + key, + value, + prev_kv: false, + }, + ) .await?; Ok(TxnOpResponse::ResponsePut(res)) } TxnOp::Get(key) => { let res = self - .range(RangeRequest { - key, - range_end: vec![], - limit: 1, - keys_only: false, - }) + .range_with_query_executor( + query_executor, + RangeRequest { + key, + range_end: vec![], + limit: 1, + keys_only: false, + }, + ) .await?; Ok(TxnOpResponse::ResponseGet(res)) } TxnOp::Delete(key) => { let res = self - .delete_range(DeleteRangeRequest { - key, - range_end: vec![], - prev_kv: false, - }) + .delete_range_with_query_executor( + query_executor, + DeleteRangeRequest { + key, + range_end: vec![], + prev_kv: false, + }, + ) .await?; Ok(TxnOpResponse::ResponseDelete(res)) } @@ -608,16 +706,17 @@ impl TxnService for PgStore { .start_timer(); let mut client = self.get_client().await?; - let pg_txn = client - .transaction() - .await - .context(PostgresTransactionSnafu { + let pg_txn = PgQueryExecutor::Transaction(client.transaction().await.context( + PostgresTransactionSnafu { operation: "start".to_string(), - })?; + }, + )?); let mut success = true; if txn.c_when { for cmp in txn.req.compare { - let value = self.point_get(&cmp.key).await?; + let value = self + .point_get_with_query_executor(&pg_txn, &cmp.key) + .await?; if cmp.compare_value(value.as_ref()) { success = true; } else { @@ -629,19 +728,17 @@ impl TxnService for PgStore { let mut responses = vec![]; if success && txn.c_then { for txnop in txn.req.success { - let res = self.execute_txn_op(txnop).await?; + let res = self.execute_txn_op(&pg_txn, txnop).await?; responses.push(res); } } else if !success && txn.c_else { for txnop in txn.req.failure { - let res = self.execute_txn_op(txnop).await?; + let res = self.execute_txn_op(&pg_txn, txnop).await?; responses.push(res); } } - pg_txn.commit().await.context(PostgresTransactionSnafu { - operation: "commit".to_string(), - })?; + pg_txn.commit().await?; Ok(KvTxnResponse { responses, succeeded: success, From 560c036ff31ad21f4f5e5ce20bbbfca1d603de62 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 11:29:30 +0800 Subject: [PATCH 04/12] test: clean up and txn test --- src/common/meta/src/kv_backend/etcd.rs | 15 ++ src/common/meta/src/kv_backend/memory.rs | 15 +- src/common/meta/src/kv_backend/postgres.rs | 22 +- src/common/meta/src/kv_backend/test.rs | 206 ++++++++++++++++++ src/common/meta/src/kv_backend/txn.rs | 235 --------------------- src/meta-srv/src/election/postgres.rs | 12 ++ 6 files changed, 268 insertions(+), 237 deletions(-) diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index a787940b6df0..213489a583c7 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -542,6 +542,8 @@ mod tests { prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix, test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix, test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix, + test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less, + test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op, unprepare_kv, }; @@ -628,4 +630,17 @@ mod tests { test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; } } + + #[tokio::test] + async fn test_etcd_txn() { + if let Some(kv_backend) = build_kv_backend().await { + let kv_backend_ref = Arc::new(kv_backend); + test_txn_one_compare_op(kv_backend_ref.clone()).await; + text_txn_multi_compare_op(kv_backend_ref.clone()).await; + test_txn_compare_equal(kv_backend_ref.clone()).await; + test_txn_compare_greater(kv_backend_ref.clone()).await; + test_txn_compare_less(kv_backend_ref.clone()).await; + test_txn_compare_not_equal(kv_backend_ref).await; + } + } } diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 9475a30001ce..b236d7b57619 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -325,7 +325,9 @@ mod tests { use crate::error::Error; use crate::kv_backend::test::{ prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, - test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, + test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, test_txn_compare_equal, + test_txn_compare_greater, test_txn_compare_less, test_txn_compare_not_equal, + test_txn_one_compare_op, text_txn_multi_compare_op, }; async fn mock_mem_store_with_data() -> MemoryKvBackend { @@ -383,4 +385,15 @@ mod tests { test_kv_batch_delete(kv_backend).await; } + + #[tokio::test] + async fn test_memory_txn() { + let kv_backend = Arc::new(MemoryKvBackend::::new()); + test_txn_one_compare_op(kv_backend.clone()).await; + text_txn_multi_compare_op(kv_backend.clone()).await; + test_txn_compare_equal(kv_backend.clone()).await; + test_txn_compare_greater(kv_backend.clone()).await; + test_txn_compare_less(kv_backend.clone()).await; + test_txn_compare_not_equal(kv_backend).await; + } } diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 30ddd93a3582..ca9073d36b56 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -341,7 +341,12 @@ impl PgStore { let res = query_executor.query(POINT_GET, &[&key]).await?; match res.is_empty() { true => Ok(None), - false => Ok(Some(res[0].get::<_, Vec>(1))), + false => { + // Safety: We are sure that the row is not empty. + let row = res.first().unwrap(); + let value: String = row.try_get(1).context(PostgresExecutionSnafu)?; + Ok(Some(value.into_bytes())) + } } } @@ -769,6 +774,8 @@ mod tests { prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix, test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix, test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix, + test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less, + test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op, unprepare_kv, }; @@ -858,4 +865,17 @@ mod tests { test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; } } + + #[tokio::test] + async fn test_pg_txn() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let kv_backend_ref = Arc::new(kv_backend); + test_txn_one_compare_op(kv_backend_ref.clone()).await; + text_txn_multi_compare_op(kv_backend_ref.clone()).await; + test_txn_compare_equal(kv_backend_ref.clone()).await; + test_txn_compare_greater(kv_backend_ref.clone()).await; + test_txn_compare_less(kv_backend_ref.clone()).await; + test_txn_compare_not_equal(kv_backend_ref).await; + } + } } diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index 2f0216dfdfcb..d428b6ed224e 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -15,6 +15,8 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; +use txn::{Compare, CompareOp, TxnOp}; + use super::{KvBackend, *}; use crate::error::Error; use crate::rpc::store::{BatchGetRequest, PutRequest}; @@ -444,3 +446,207 @@ pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix assert!(kv_backend.get(&key3).await.unwrap().is_none()); assert!(kv_backend.get(&key11).await.unwrap().is_none()); } + +pub async fn test_txn_one_compare_op(kv_backend: KvBackendRef) { + let _ = kv_backend + .put(PutRequest { + key: vec![11], + value: vec![3], + ..Default::default() + }) + .await + .unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_value( + vec![11], + CompareOp::Greater, + vec![1], + )]) + .and_then(vec![TxnOp::Put(vec![11], vec![1])]) + .or_else(vec![TxnOp::Put(vec![11], vec![2])]); + + let txn_response = kv_backend.txn(txn).await.unwrap(); + + assert!(txn_response.succeeded); + assert_eq!(txn_response.responses.len(), 1); +} + +pub async fn text_txn_multi_compare_op(kv_backend: KvBackendRef) { + for i in 1..3 { + let _ = kv_backend + .put(PutRequest { + key: vec![i], + value: vec![i], + ..Default::default() + }) + .await + .unwrap(); + } + + let when: Vec<_> = (1..3u8) + .map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i])) + .collect(); + + let txn = Txn::new() + .when(when) + .and_then(vec![ + TxnOp::Put(vec![1], vec![10]), + TxnOp::Put(vec![2], vec![20]), + ]) + .or_else(vec![TxnOp::Put(vec![1], vec![11])]); + + let txn_response = kv_backend.txn(txn).await.unwrap(); + + assert!(txn_response.succeeded); + assert_eq!(txn_response.responses.len(), 2); +} + +pub async fn test_txn_compare_equal(kv_backend: KvBackendRef) { + let key = vec![101u8]; + kv_backend.delete(&key, false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_value_not_exists( + key.clone(), + CompareOp::Equal, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); + assert!(txn_response.succeeded); + + let txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Equal, + vec![2], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Put(key, vec![4])]); + let txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(txn_response.succeeded); +} + +pub async fn test_txn_compare_greater(kv_backend: KvBackendRef) { + let key = vec![102u8]; + kv_backend.delete(&key, false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_value_not_exists( + key.clone(), + CompareOp::Greater, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Greater, + vec![1], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Get(key.clone())]); + let mut txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + let res = txn_response.responses.pop().unwrap(); + assert_eq!( + res, + TxnOpResponse::ResponseGet(RangeResponse { + kvs: vec![KeyValue { + key, + value: vec![1] + }], + more: false, + }) + ); +} + +pub async fn test_txn_compare_less(kv_backend: KvBackendRef) { + let key = vec![103u8]; + kv_backend.delete(&[3], false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_value_not_exists( + key.clone(), + CompareOp::Less, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Less, + vec![2], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Get(key.clone())]); + let mut txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + let res = txn_response.responses.pop().unwrap(); + assert_eq!( + res, + TxnOpResponse::ResponseGet(RangeResponse { + kvs: vec![KeyValue { + key, + value: vec![2] + }], + more: false, + }) + ); +} + +pub async fn test_txn_compare_not_equal(kv_backend: KvBackendRef) { + let key = vec![104u8]; + kv_backend.delete(&key, false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_value_not_exists( + key.clone(), + CompareOp::NotEqual, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Equal, + vec![2], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Get(key.clone())]); + let mut txn_response = kv_backend.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + let res = txn_response.responses.pop().unwrap(); + assert_eq!( + res, + TxnOpResponse::ResponseGet(RangeResponse { + kvs: vec![KeyValue { + key, + value: vec![1] + }], + more: false, + }) + ); +} diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index 19d9479368ca..55812b170f7f 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -241,14 +241,7 @@ impl From for TxnRequest { #[cfg(test)] mod tests { - use std::sync::Arc; - use super::*; - use crate::error::Error; - use crate::kv_backend::memory::MemoryKvBackend; - use crate::kv_backend::KvBackendRef; - use crate::rpc::store::PutRequest; - use crate::rpc::KeyValue; #[test] fn test_compare() { @@ -310,232 +303,4 @@ mod tests { } ); } - - #[tokio::test] - async fn test_txn_one_compare_op() { - let kv_backend = create_kv_backend().await; - - let _ = kv_backend - .put(PutRequest { - key: vec![11], - value: vec![3], - ..Default::default() - }) - .await - .unwrap(); - - let txn = Txn::new() - .when(vec![Compare::with_value( - vec![11], - CompareOp::Greater, - vec![1], - )]) - .and_then(vec![TxnOp::Put(vec![11], vec![1])]) - .or_else(vec![TxnOp::Put(vec![11], vec![2])]); - - let txn_response = kv_backend.txn(txn).await.unwrap(); - - assert!(txn_response.succeeded); - assert_eq!(txn_response.responses.len(), 1); - } - - #[tokio::test] - async fn test_txn_multi_compare_op() { - let kv_backend = create_kv_backend().await; - - for i in 1..3 { - let _ = kv_backend - .put(PutRequest { - key: vec![i], - value: vec![i], - ..Default::default() - }) - .await - .unwrap(); - } - - let when: Vec<_> = (1..3u8) - .map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i])) - .collect(); - - let txn = Txn::new() - .when(when) - .and_then(vec![ - TxnOp::Put(vec![1], vec![10]), - TxnOp::Put(vec![2], vec![20]), - ]) - .or_else(vec![TxnOp::Put(vec![1], vec![11])]); - - let txn_response = kv_backend.txn(txn).await.unwrap(); - - assert!(txn_response.succeeded); - assert_eq!(txn_response.responses.len(), 2); - } - - #[tokio::test] - async fn test_txn_compare_equal() { - let kv_backend = create_kv_backend().await; - let key = vec![101u8]; - kv_backend.delete(&key, false).await.unwrap(); - - let txn = Txn::new() - .when(vec![Compare::with_value_not_exists( - key.clone(), - CompareOp::Equal, - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) - .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); - assert!(txn_response.succeeded); - - let txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(!txn_response.succeeded); - - let txn = Txn::new() - .when(vec![Compare::with_value( - key.clone(), - CompareOp::Equal, - vec![2], - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) - .or_else(vec![TxnOp::Put(key, vec![4])]); - let txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(txn_response.succeeded); - } - - #[tokio::test] - async fn test_txn_compare_greater() { - let kv_backend = create_kv_backend().await; - let key = vec![102u8]; - kv_backend.delete(&key, false).await.unwrap(); - - let txn = Txn::new() - .when(vec![Compare::with_value_not_exists( - key.clone(), - CompareOp::Greater, - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) - .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); - assert!(!txn_response.succeeded); - - let txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(txn_response.succeeded); - - let txn = Txn::new() - .when(vec![Compare::with_value( - key.clone(), - CompareOp::Greater, - vec![1], - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) - .or_else(vec![TxnOp::Get(key.clone())]); - let mut txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(!txn_response.succeeded); - let res = txn_response.responses.pop().unwrap(); - assert_eq!( - res, - TxnOpResponse::ResponseGet(RangeResponse { - kvs: vec![KeyValue { - key, - value: vec![1] - }], - more: false, - }) - ); - } - - #[tokio::test] - async fn test_txn_compare_less() { - let kv_backend = create_kv_backend().await; - let key = vec![103u8]; - kv_backend.delete(&[3], false).await.unwrap(); - - let txn = Txn::new() - .when(vec![Compare::with_value_not_exists( - key.clone(), - CompareOp::Less, - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) - .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); - assert!(!txn_response.succeeded); - - let txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(!txn_response.succeeded); - - let txn = Txn::new() - .when(vec![Compare::with_value( - key.clone(), - CompareOp::Less, - vec![2], - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) - .or_else(vec![TxnOp::Get(key.clone())]); - let mut txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(!txn_response.succeeded); - let res = txn_response.responses.pop().unwrap(); - assert_eq!( - res, - TxnOpResponse::ResponseGet(RangeResponse { - kvs: vec![KeyValue { - key, - value: vec![2] - }], - more: false, - }) - ); - } - - #[tokio::test] - async fn test_txn_compare_not_equal() { - let kv_backend = create_kv_backend().await; - let key = vec![104u8]; - kv_backend.delete(&key, false).await.unwrap(); - - let txn = Txn::new() - .when(vec![Compare::with_value_not_exists( - key.clone(), - CompareOp::NotEqual, - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) - .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); - assert!(!txn_response.succeeded); - - let txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(txn_response.succeeded); - - let txn = Txn::new() - .when(vec![Compare::with_value( - key.clone(), - CompareOp::Equal, - vec![2], - )]) - .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) - .or_else(vec![TxnOp::Get(key.clone())]); - let mut txn_response = kv_backend.txn(txn).await.unwrap(); - assert!(!txn_response.succeeded); - let res = txn_response.responses.pop().unwrap(); - assert_eq!( - res, - TxnOpResponse::ResponseGet(RangeResponse { - kvs: vec![KeyValue { - key, - value: vec![1] - }], - more: false, - }) - ); - } - - async fn create_kv_backend() -> KvBackendRef { - Arc::new(MemoryKvBackend::::new()) - // TODO(jiachun): Add a feature to test against etcd in github CI - // - // The same test can be run against etcd by uncommenting the following line - // crate::service::store::etcd::EtcdStore::with_endpoints(["127.0.0.1:2379"]) - // .await - // .unwrap() - } } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 22bde228502e..566860e185d2 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -1116,6 +1116,12 @@ mod tests { } _ => panic!("Expected LeaderChangeMessage::StepDown"), } + + leader_pg_election + .client + .query(STEP_DOWN, &[]) + .await + .unwrap(); } #[tokio::test] @@ -1185,5 +1191,11 @@ mod tests { } _ => panic!("Expected LeaderChangeMessage::StepDown"), } + + leader_pg_election + .client + .query(STEP_DOWN, &[]) + .await + .unwrap(); } } From 479f9e53554dd195085ab29b0dfec79a23cd23f7 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 12:07:16 +0800 Subject: [PATCH 05/12] test: clean up --- src/common/meta/src/kv_backend/postgres.rs | 10 +++++++++- src/meta-srv/src/election/postgres.rs | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index ca9073d36b56..ac8dd91bbd18 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -875,7 +875,15 @@ mod tests { test_txn_compare_equal(kv_backend_ref.clone()).await; test_txn_compare_greater(kv_backend_ref.clone()).await; test_txn_compare_less(kv_backend_ref.clone()).await; - test_txn_compare_not_equal(kv_backend_ref).await; + test_txn_compare_not_equal(kv_backend_ref.clone()).await; + // Clean up + kv_backend_ref + .get_client() + .await + .unwrap() + .execute("DELETE FROM greptime_metakv", &[]) + .await + .unwrap(); } } } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 566860e185d2..619f7df7f274 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -1117,11 +1117,18 @@ mod tests { _ => panic!("Expected LeaderChangeMessage::StepDown"), } + // Clean up leader_pg_election .client .query(STEP_DOWN, &[]) .await .unwrap(); + + leader_pg_election + .client + .query("DELETE FROM greptime_metakv", &[]) + .await + .unwrap(); } #[tokio::test] @@ -1192,10 +1199,17 @@ mod tests { _ => panic!("Expected LeaderChangeMessage::StepDown"), } + // Clean up leader_pg_election .client .query(STEP_DOWN, &[]) .await .unwrap(); + + leader_pg_election + .client + .query("DELETE FROM greptime_metakv", &[]) + .await + .unwrap(); } } From 86292b081a4e24a0a64cddccb2e543f6ff616938 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 13:11:38 +0800 Subject: [PATCH 06/12] test: change lock_id to avoid conflict in test --- src/meta-srv/src/election/postgres.rs | 44 +++++++++++++++++++-------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 619f7df7f274..29da85f40d22 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -35,8 +35,8 @@ use crate::error::{ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; // TODO(CookiePie): The lock id should be configurable. -const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(28319)"; -const STEP_DOWN: &str = "SELECT pg_advisory_unlock(28319)"; +const CAMPAIGN: &str = "SELECT pg_try_advisory_lock({})"; +const STEP_DOWN: &str = "SELECT pg_advisory_unlock({})"; const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1"; // Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. // Either the leader reconnects and step down or the session expires and the lock is released. @@ -73,6 +73,14 @@ const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIM const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;"; +fn campaign_sql(lock_id: u64) -> String { + CAMPAIGN.replace("{}", &lock_id.to_string()) +} + +fn step_down_sql(lock_id: u64) -> String { + STEP_DOWN.replace("{}", &lock_id.to_string()) +} + /// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { let (value, expire_time) = value @@ -130,6 +138,7 @@ pub struct PgElection { leader_watcher: broadcast::Sender, store_key_prefix: String, candidate_lease_ttl_secs: u64, + lock_id: u64, } impl PgElection { @@ -154,6 +163,8 @@ impl PgElection { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, + // TODO(CookiePie): The lock id should be configurable. + lock_id: 28319, })) } @@ -265,7 +276,7 @@ impl Election for PgElection { loop { let res = self .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(self.lock_id), &[]) .await .context(PostgresExecutionSnafu)?; if let Some(row) = res.first() { @@ -550,7 +561,7 @@ impl PgElection { { self.delete_value(&key).await?; self.client - .query(STEP_DOWN, &[]) + .query(&step_down_sql(self.lock_id), &[]) .await .context(PostgresExecutionSnafu)?; if let Err(e) = self @@ -659,6 +670,7 @@ mod tests { leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs: 10, + lock_id: 28319, }; let res = pg_election @@ -728,6 +740,7 @@ mod tests { leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, + lock_id: 28319, }; let node_info = MetasrvNodeInfo { @@ -764,6 +777,7 @@ mod tests { leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, + lock_id: 28319, }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -804,6 +818,7 @@ mod tests { leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, + lock_id: 28320, }; leader_pg_election.elected().await.unwrap(); @@ -911,12 +926,13 @@ mod tests { leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, + lock_id: 28321, }; // Step 1: No leader exists, campaign and elected. let res = leader_pg_election .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -947,7 +963,7 @@ mod tests { // Step 2: As a leader, renew the lease. let res = leader_pg_election .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -967,7 +983,7 @@ mod tests { let res = leader_pg_election .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -995,7 +1011,7 @@ mod tests { // Step 4: Re-campaign and elected. let res = leader_pg_election .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1052,7 +1068,7 @@ mod tests { // Step 6: Re-campaign and elected. let res = leader_pg_election .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1083,7 +1099,7 @@ mod tests { // Step 7: Something wrong, the leader key changed by others. let res = leader_pg_election .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1120,7 +1136,7 @@ mod tests { // Clean up leader_pg_election .client - .query(STEP_DOWN, &[]) + .query(&step_down_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); @@ -1145,6 +1161,7 @@ mod tests { leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, + lock_id: 28322, }; let leader_client = create_postgres_client().await.unwrap(); @@ -1157,11 +1174,12 @@ mod tests { leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, + lock_id: 28322, }; leader_pg_election .client - .query(CAMPAIGN, &[]) + .query(&campaign_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); leader_pg_election.elected().await.unwrap(); @@ -1202,7 +1220,7 @@ mod tests { // Clean up leader_pg_election .client - .query(STEP_DOWN, &[]) + .query(&step_down_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); From 21abc879f084c64ffc8f326bbc49f0979f9bdefb Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 15:02:39 +0800 Subject: [PATCH 07/12] test: use different prefix in pg election test --- src/meta-srv/src/election/postgres.rs | 32 ++++++++++----------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 29da85f40d22..ceaf11e77a38 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -632,6 +632,7 @@ impl PgElection { mod tests { use std::env; + use rand::Rng; use tokio_postgres::{Client, NoTls}; use super::*; @@ -668,7 +669,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: "test_prefix".to_string(), + store_key_prefix: rand::thread_rng().gen::().to_string(), candidate_lease_ttl_secs: 10, lock_id: 28319, }; @@ -738,7 +739,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: "test_prefix".to_string(), + store_key_prefix: rand::thread_rng().gen::().to_string(), candidate_lease_ttl_secs, lock_id: 28319, }; @@ -766,6 +767,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(3)).await; let client = create_postgres_client().await.unwrap(); + let store_key_prefix = rand::thread_rng().gen::().to_string(); let (tx, _) = broadcast::channel(100); let leader_value = "test_leader".to_string(); @@ -775,7 +777,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: "test_prefix".to_string(), + store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, lock_id: 28319, }; @@ -796,7 +798,7 @@ mod tests { for i in 0..10 { let key = format!( "{}{}{}{}", - "test_prefix", CANDIDATES_ROOT, leader_value_prefix, i + store_key_prefix, CANDIDATES_ROOT, leader_value_prefix, i ); let res = pg_election.delete_value(&key).await.unwrap(); assert!(res); @@ -816,7 +818,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: "test_prefix".to_string(), + store_key_prefix: rand::thread_rng().gen::().to_string(), candidate_lease_ttl_secs, lock_id: 28320, }; @@ -914,6 +916,7 @@ mod tests { #[tokio::test] async fn test_leader_action() { let leader_value = "test_leader".to_string(); + let store_key_prefix = rand::thread_rng().gen::().to_string(); let candidate_lease_ttl_secs = 5; let client = create_postgres_client().await.unwrap(); @@ -924,7 +927,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: "test_prefix".to_string(), + store_key_prefix, candidate_lease_ttl_secs, lock_id: 28321, }; @@ -1139,17 +1142,12 @@ mod tests { .query(&step_down_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); - - leader_pg_election - .client - .query("DELETE FROM greptime_metakv", &[]) - .await - .unwrap(); } #[tokio::test] async fn test_follower_action() { let candidate_lease_ttl_secs = 5; + let store_key_prefix = rand::thread_rng().gen::().to_string(); let follower_client = create_postgres_client().await.unwrap(); let (tx, mut rx) = broadcast::channel(100); @@ -1159,7 +1157,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: "test_prefix".to_string(), + store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, lock_id: 28322, }; @@ -1172,7 +1170,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: "test_prefix".to_string(), + store_key_prefix, candidate_lease_ttl_secs, lock_id: 28322, }; @@ -1223,11 +1221,5 @@ mod tests { .query(&step_down_sql(leader_pg_election.lock_id), &[]) .await .unwrap(); - - leader_pg_election - .client - .query("DELETE FROM greptime_metakv", &[]) - .await - .unwrap(); } } From 97fb26250244a08d573152687d10e331b4b42744 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 15:31:55 +0800 Subject: [PATCH 08/12] fix(test): just a fix --- src/meta-srv/src/election/postgres.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index ceaf11e77a38..01293b8afd3b 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -729,7 +729,11 @@ mod tests { assert!(current == Timestamp::default()); } - async fn candidate(leader_value: String, candidate_lease_ttl_secs: u64) { + async fn candidate( + leader_value: String, + candidate_lease_ttl_secs: u64, + store_key_prefix: String, + ) { let client = create_postgres_client().await.unwrap(); let (tx, _) = broadcast::channel(100); @@ -739,7 +743,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: rand::thread_rng().gen::().to_string(), + store_key_prefix, candidate_lease_ttl_secs, lock_id: 28319, }; @@ -757,17 +761,21 @@ mod tests { async fn test_candidate_registration() { let leader_value_prefix = "test_leader".to_string(); let candidate_lease_ttl_secs = 5; + let store_key_prefix = rand::thread_rng().gen::().to_string(); let mut handles = vec![]; for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); - let handle = tokio::spawn(candidate(leader_value, candidate_lease_ttl_secs)); + let handle = tokio::spawn(candidate( + leader_value, + candidate_lease_ttl_secs, + store_key_prefix.clone(), + )); handles.push(handle); } // Wait for candidates to registrate themselves and renew their leases at least once. tokio::time::sleep(Duration::from_secs(3)).await; let client = create_postgres_client().await.unwrap(); - let store_key_prefix = rand::thread_rng().gen::().to_string(); let (tx, _) = broadcast::channel(100); let leader_value = "test_leader".to_string(); From a1d4c2bdac7ab72d60ef7220418f95af97548efe Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 15:52:42 +0800 Subject: [PATCH 09/12] test: aggregate multiple test to avoid concurrency problem --- src/common/meta/src/kv_backend/postgres.rs | 43 ++++------------------ 1 file changed, 8 insertions(+), 35 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index ac8dd91bbd18..f17c4e6739d7 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -804,70 +804,43 @@ mod tests { } #[tokio::test] - async fn test_put() { + async fn test_pg_crud() { if let Some(kv_backend) = build_pg_kv_backend().await { let prefix = b"put/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; - } - } - #[tokio::test] - async fn test_range() { - if let Some(kv_backend) = build_pg_kv_backend().await { let prefix = b"range/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; - } - } - #[tokio::test] - async fn test_range_2() { - if let Some(kv_backend) = build_pg_kv_backend().await { - test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; - } - } - - #[tokio::test] - async fn test_batch_get() { - if let Some(kv_backend) = build_pg_kv_backend().await { let prefix = b"batchGet/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; + + let prefix = b"deleteRange/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; } - } - #[tokio::test(flavor = "multi_thread")] - async fn test_compare_and_put() { if let Some(kv_backend) = build_pg_kv_backend().await { - let kv_backend = Arc::new(kv_backend); - test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; + test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; } - } - #[tokio::test] - async fn test_delete_range() { if let Some(kv_backend) = build_pg_kv_backend().await { - let prefix = b"deleteRange/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; } - } - #[tokio::test] - async fn test_batch_delete() { if let Some(kv_backend) = build_pg_kv_backend().await { let prefix = b"batchDelete/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; } - } - #[tokio::test] - async fn test_pg_txn() { if let Some(kv_backend) = build_pg_kv_backend().await { let kv_backend_ref = Arc::new(kv_backend); test_txn_one_compare_op(kv_backend_ref.clone()).await; From 22ff5c60bf99a96f1c81a8b98a1713c040f0b8e3 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 16:49:22 +0800 Subject: [PATCH 10/12] test: use uuid instead of rng --- Cargo.lock | 1 + src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/election/postgres.rs | 11 +++++------ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 842928528b35..c2fc150f4bfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6600,6 +6600,7 @@ dependencies = [ "tracing-subscriber", "typetag", "url", + "uuid", ] [[package]] diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 8fcc9379e631..b383607afe66 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -65,6 +65,7 @@ tonic.workspace = true tower.workspace = true typetag.workspace = true url = "2.3" +uuid.workspace = true [dev-dependencies] chrono.workspace = true diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 01293b8afd3b..35e894404fa2 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -632,7 +632,6 @@ impl PgElection { mod tests { use std::env; - use rand::Rng; use tokio_postgres::{Client, NoTls}; use super::*; @@ -669,7 +668,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: rand::thread_rng().gen::().to_string(), + store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs: 10, lock_id: 28319, }; @@ -761,7 +760,7 @@ mod tests { async fn test_candidate_registration() { let leader_value_prefix = "test_leader".to_string(); let candidate_lease_ttl_secs = 5; - let store_key_prefix = rand::thread_rng().gen::().to_string(); + let store_key_prefix = uuid::Uuid::new_v4().to_string(); let mut handles = vec![]; for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); @@ -826,7 +825,7 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: rand::thread_rng().gen::().to_string(), + store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs, lock_id: 28320, }; @@ -924,7 +923,7 @@ mod tests { #[tokio::test] async fn test_leader_action() { let leader_value = "test_leader".to_string(); - let store_key_prefix = rand::thread_rng().gen::().to_string(); + let store_key_prefix = uuid::Uuid::new_v4().to_string(); let candidate_lease_ttl_secs = 5; let client = create_postgres_client().await.unwrap(); @@ -1155,7 +1154,7 @@ mod tests { #[tokio::test] async fn test_follower_action() { let candidate_lease_ttl_secs = 5; - let store_key_prefix = rand::thread_rng().gen::().to_string(); + let store_key_prefix = uuid::Uuid::new_v4().to_string(); let follower_client = create_postgres_client().await.unwrap(); let (tx, mut rx) = broadcast::channel(100); From 0979c54175c1f6a55ee4b549ddbd4337ee511ae5 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 18:39:29 +0800 Subject: [PATCH 11/12] perf: batch cmp in txn --- src/common/meta/src/kv_backend/postgres.rs | 62 +++++++++++----------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index f17c4e6739d7..ee0919ea7ac3 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -25,7 +25,9 @@ use crate::error::{ CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, PostgresTransactionSnafu, Result, StrFromUtf8Snafu, }; -use crate::kv_backend::txn::{Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse}; +use crate::kv_backend::txn::{ + Compare, Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse, +}; use crate::kv_backend::{KvBackend, KvBackendRef, TxnService}; use crate::metrics::METRIC_META_TXN_REQUEST; use crate::rpc::store::{ @@ -331,25 +333,6 @@ impl KvBackend for PgStore { } impl PgStore { - /// Point get with certain client. It's needed for a client with transaction. - async fn point_get_with_query_executor( - &self, - query_executor: &PgQueryExecutor<'_>, - key: &[u8], - ) -> Result>> { - let key = process_bytes(key, "pointGetKey")?; - let res = query_executor.query(POINT_GET, &[&key]).await?; - match res.is_empty() { - true => Ok(None), - false => { - // Safety: We are sure that the row is not empty. - let row = res.first().unwrap(); - let value: String = row.try_get(1).context(PostgresExecutionSnafu)?; - Ok(Some(value.into_bytes())) - } - } - } - async fn range_with_query_executor( &self, query_executor: &PgQueryExecutor<'_>, @@ -651,6 +634,31 @@ impl PgStore { } } + async fn execute_txn_cmp( + &self, + query_executor: &PgQueryExecutor<'_>, + cmp: &[Compare], + ) -> Result { + let batch_get_req = BatchGetRequest { + keys: cmp.iter().map(|c| c.key.clone()).collect(), + }; + let res = self + .batch_get_with_query_executor(query_executor, batch_get_req) + .await?; + let res_map = res + .kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::, Vec>>(); + for c in cmp { + let value = res_map.get(&c.key); + if !c.compare_value(value) { + return Ok(false); + } + } + Ok(true) + } + async fn execute_txn_op( &self, query_executor: &PgQueryExecutor<'_>, @@ -707,7 +715,7 @@ impl TxnService for PgStore { async fn txn(&self, txn: KvTxn) -> Result { let _timer = METRIC_META_TXN_REQUEST - .with_label_values(&["etcd", "txn"]) + .with_label_values(&["postgres", "txn"]) .start_timer(); let mut client = self.get_client().await?; @@ -718,17 +726,7 @@ impl TxnService for PgStore { )?); let mut success = true; if txn.c_when { - for cmp in txn.req.compare { - let value = self - .point_get_with_query_executor(&pg_txn, &cmp.key) - .await?; - if cmp.compare_value(value.as_ref()) { - success = true; - } else { - success = false; - break; - } - } + success = self.execute_txn_cmp(&pg_txn, &txn.req.compare).await?; } let mut responses = vec![]; if success && txn.c_then { From 471bc4394188ca586da084bdb5bdde83f29a7320 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 3 Jan 2025 20:41:16 +0800 Subject: [PATCH 12/12] perf: batch same op in txn --- src/cli/Cargo.toml | 3 + src/cli/src/bench.rs | 27 ++- src/common/meta/src/kv_backend/postgres.rs | 226 ++++++++++++++++++--- 3 files changed, 219 insertions(+), 37 deletions(-) diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 9a3d37bd2a34..48648dd0b935 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -4,6 +4,9 @@ version.workspace = true edition.workspace = true license.workspace = true +[features] +pg_kvbackend = ["common-meta/pg_kvbackend"] + [lints] workspace = true diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index 9731bf8e6fa6..c04512548033 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -22,6 +22,9 @@ use clap::Parser; use common_error::ext::BoxedError; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::etcd::EtcdStore; +use common_meta::kv_backend::memory::MemoryKvBackend; +#[cfg(feature = "pg_kvbackend")] +use common_meta::kv_backend::postgres::PgStore; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_telemetry::info; @@ -55,18 +58,32 @@ where #[derive(Debug, Default, Parser)] pub struct BenchTableMetadataCommand { #[clap(long)] - etcd_addr: String, + etcd_addr: Option, + #[cfg(feature = "pg_kvbackend")] + #[clap(long)] + postgres_addr: Option, #[clap(long)] count: u32, } impl BenchTableMetadataCommand { pub async fn build(&self) -> std::result::Result, BoxedError> { - let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128) - .await - .unwrap(); + let kv_backend = if let Some(etcd_addr) = &self.etcd_addr { + info!("Using etcd as kv backend"); + EtcdStore::with_endpoints([etcd_addr], 128).await.unwrap() + } else { + Arc::new(MemoryKvBackend::new()) + }; + + #[cfg(feature = "pg_kvbackend")] + let kv_backend = if let Some(postgres_addr) = &self.postgres_addr { + info!("Using postgres as kv backend"); + PgStore::with_url(postgres_addr, 128).await.unwrap() + } else { + kv_backend + }; - let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store)); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend)); let tool = BenchTableMetadata { table_metadata_manager, diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index ee0919ea7ac3..b75f045314ec 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -175,6 +175,21 @@ impl PgStore { } } + async fn get_client_executor(&self) -> Result> { + let client = self.get_client().await?; + Ok(PgQueryExecutor::Client(client)) + } + + async fn get_txn_executor<'a>(&self, client: &'a mut PgClient) -> Result> { + let txn = client + .transaction() + .await + .context(PostgresTransactionSnafu { + operation: "start".to_string(), + })?; + Ok(PgQueryExecutor::Transaction(txn)) + } + async fn put_if_not_exists_with_query_executor( &self, query_executor: &PgQueryExecutor<'_>, @@ -290,45 +305,38 @@ impl KvBackend for PgStore { } async fn range(&self, req: RangeRequest) -> Result { - let client = self.get_client().await?; - self.range_with_query_executor(&PgQueryExecutor::Client(client), req) - .await + let client = self.get_client_executor().await?; + self.range_with_query_executor(&client, req).await } async fn put(&self, req: PutRequest) -> Result { - let client = self.get_client().await?; - self.put_with_query_executor(&PgQueryExecutor::Client(client), req) - .await + let client = self.get_client_executor().await?; + self.put_with_query_executor(&client, req).await } async fn batch_put(&self, req: BatchPutRequest) -> Result { - let client = self.get_client().await?; - self.batch_put_with_query_executor(&PgQueryExecutor::Client(client), req) - .await + let client = self.get_client_executor().await?; + self.batch_put_with_query_executor(&client, req).await } async fn batch_get(&self, req: BatchGetRequest) -> Result { - let client = self.get_client().await?; - self.batch_get_with_query_executor(&PgQueryExecutor::Client(client), req) - .await + let client = self.get_client_executor().await?; + self.batch_get_with_query_executor(&client, req).await } async fn delete_range(&self, req: DeleteRangeRequest) -> Result { - let client = self.get_client().await?; - self.delete_range_with_query_executor(&PgQueryExecutor::Client(client), req) - .await + let client = self.get_client_executor().await?; + self.delete_range_with_query_executor(&client, req).await } async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { - let client = self.get_client().await?; - self.batch_delete_with_query_executor(&PgQueryExecutor::Client(client), req) - .await + let client = self.get_client_executor().await?; + self.batch_delete_with_query_executor(&client, req).await } async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { - let client = self.get_client().await?; - self.compare_and_put_with_query_executor(&PgQueryExecutor::Client(client), req) - .await + let client = self.get_client_executor().await?; + self.compare_and_put_with_query_executor(&client, req).await } } @@ -659,6 +667,135 @@ impl PgStore { Ok(true) } + /// Execute a batch of transaction operations. This function is only used for transactions with the same operation type. + async fn try_batch_txn( + &self, + query_executor: &PgQueryExecutor<'_>, + txn_ops: &[TxnOp], + ) -> Result>> { + if !check_txn_ops(txn_ops)? { + return Ok(None); + } + match txn_ops.first() { + Some(TxnOp::Delete(_)) => { + let mut batch_del_req = BatchDeleteRequest { + keys: vec![], + prev_kv: false, + }; + for op in txn_ops { + if let TxnOp::Delete(key) = op { + batch_del_req.keys.push(key.clone()); + } + } + let res = self + .batch_delete_with_query_executor(query_executor, batch_del_req) + .await?; + let res_map = res + .prev_kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::, Vec>>(); + let mut resps = Vec::with_capacity(txn_ops.len()); + for op in txn_ops { + if let TxnOp::Delete(key) = op { + let value = res_map.get(key); + resps.push(TxnOpResponse::ResponseDelete(DeleteRangeResponse { + deleted: if value.is_some() { 1 } else { 0 }, + prev_kvs: value + .map(|v| { + vec![KeyValue { + key: key.clone(), + value: v.clone(), + }] + }) + .unwrap_or_default(), + })); + } + } + Ok(Some(resps)) + } + Some(TxnOp::Put(_, _)) => { + let mut batch_put_req = BatchPutRequest { + kvs: vec![], + prev_kv: false, + }; + for op in txn_ops { + if let TxnOp::Put(key, value) = op { + batch_put_req.kvs.push(KeyValue { + key: key.clone(), + value: value.clone(), + }); + } + } + let res = self + .batch_put_with_query_executor(query_executor, batch_put_req) + .await?; + let res_map = res + .prev_kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::, Vec>>(); + let mut resps = Vec::with_capacity(txn_ops.len()); + for op in txn_ops { + if let TxnOp::Put(key, _) = op { + let prev_kv = res_map.get(key); + match prev_kv { + Some(v) => { + resps.push(TxnOpResponse::ResponsePut(PutResponse { + prev_kv: Some(KeyValue { + key: key.clone(), + value: v.clone(), + }), + })); + } + None => { + resps.push(TxnOpResponse::ResponsePut(PutResponse { + prev_kv: None, + })); + } + } + } + } + Ok(Some(resps)) + } + Some(TxnOp::Get(_)) => { + let mut batch_get_req = BatchGetRequest { keys: vec![] }; + for op in txn_ops { + if let TxnOp::Get(key) = op { + batch_get_req.keys.push(key.clone()); + } + } + let res = self + .batch_get_with_query_executor(query_executor, batch_get_req) + .await?; + let res_map = res + .kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::, Vec>>(); + let mut resps = Vec::with_capacity(txn_ops.len()); + for op in txn_ops { + if let TxnOp::Get(key) = op { + let value = res_map.get(key); + resps.push(TxnOpResponse::ResponseGet(RangeResponse { + kvs: value + .map(|v| { + vec![KeyValue { + key: key.clone(), + value: v.clone(), + }] + }) + .unwrap_or_default(), + more: false, + })); + } + } + Ok(Some(resps)) + } + None => Ok(Some(vec![])), + } + } + async fn execute_txn_op( &self, query_executor: &PgQueryExecutor<'_>, @@ -719,25 +856,31 @@ impl TxnService for PgStore { .start_timer(); let mut client = self.get_client().await?; - let pg_txn = PgQueryExecutor::Transaction(client.transaction().await.context( - PostgresTransactionSnafu { - operation: "start".to_string(), - }, - )?); + let pg_txn = self.get_txn_executor(&mut client).await?; let mut success = true; if txn.c_when { success = self.execute_txn_cmp(&pg_txn, &txn.req.compare).await?; } let mut responses = vec![]; if success && txn.c_then { - for txnop in txn.req.success { - let res = self.execute_txn_op(&pg_txn, txnop).await?; - responses.push(res); + match self.try_batch_txn(&pg_txn, &txn.req.success).await? { + Some(res) => responses.extend(res), + None => { + for txnop in txn.req.success { + let res = self.execute_txn_op(&pg_txn, txnop).await?; + responses.push(res); + } + } } } else if !success && txn.c_else { - for txnop in txn.req.failure { - let res = self.execute_txn_op(&pg_txn, txnop).await?; - responses.push(res); + match self.try_batch_txn(&pg_txn, &txn.req.failure).await? { + Some(res) => responses.extend(res), + None => { + for txnop in txn.req.failure { + let res = self.execute_txn_op(&pg_txn, txnop).await?; + responses.push(res); + } + } } } @@ -765,6 +908,25 @@ fn is_prefix_range(start: &[u8], end: &[u8]) -> bool { false } +/// Check if the transaction operations are the same type. +fn check_txn_ops(txn_ops: &[TxnOp]) -> Result { + if txn_ops.is_empty() { + return Ok(false); + } + let first_op = &txn_ops[0]; + for op in txn_ops { + match (op, first_op) { + (TxnOp::Put(_, _), TxnOp::Put(_, _)) => {} + (TxnOp::Get(_), TxnOp::Get(_)) => {} + (TxnOp::Delete(_), TxnOp::Delete(_)) => {} + _ => { + return Ok(false); + } + } + } + Ok(true) +} + #[cfg(test)] mod tests { use super::*;