Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Txn for pg kv backend #5266

Merged
merged 12 commits into from
Jan 6, 2025
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[features]
pg_kvbackend = ["common-meta/pg_kvbackend"]

[lints]
workspace = true

Expand Down
27 changes: 22 additions & 5 deletions src/cli/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::postgres::PgStore;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_telemetry::info;
Expand Down Expand Up @@ -55,18 +58,32 @@ where
#[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand {
#[clap(long)]
etcd_addr: String,
etcd_addr: Option<String>,
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
count: u32,
}

impl BenchTableMetadataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();
let kv_backend = if let Some(etcd_addr) = &self.etcd_addr {
info!("Using etcd as kv backend");
EtcdStore::with_endpoints([etcd_addr], 128).await.unwrap()
} else {
Arc::new(MemoryKvBackend::new())
};

#[cfg(feature = "pg_kvbackend")]
let kv_backend = if let Some(postgres_addr) = &self.postgres_addr {
info!("Using postgres as kv backend");
PgStore::with_url(postgres_addr, 128).await.unwrap()
} else {
kv_backend
};

let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));

let tool = BenchTableMetadata {
table_metadata_manager,
Expand Down
17 changes: 14 additions & 3 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,16 @@ pub enum Error {
location: Location,
},

#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to {} Postgres transaction", operation))]
PostgresTransaction {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
operation: String,
},

#[snafu(display(
"Datanode table info not found, table id: {}, datanode id: {}",
table_id,
Expand Down Expand Up @@ -794,9 +804,10 @@ impl ErrorExt for Error {
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,

#[cfg(feature = "pg_kvbackend")]
PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => {
StatusCode::Internal
}
PostgresExecution { .. }
| CreatePostgresPool { .. }
| GetPostgresConnection { .. }
| PostgresTransaction { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ mod tests {
prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
unprepare_kv,
};

Expand Down Expand Up @@ -628,4 +630,17 @@ mod tests {
test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await;
}
}

#[tokio::test]
async fn test_etcd_txn() {
if let Some(kv_backend) = build_kv_backend().await {
let kv_backend_ref = Arc::new(kv_backend);
test_txn_one_compare_op(kv_backend_ref.clone()).await;
text_txn_multi_compare_op(kv_backend_ref.clone()).await;
test_txn_compare_equal(kv_backend_ref.clone()).await;
test_txn_compare_greater(kv_backend_ref.clone()).await;
test_txn_compare_less(kv_backend_ref.clone()).await;
test_txn_compare_not_equal(kv_backend_ref).await;
}
}
}
15 changes: 14 additions & 1 deletion src/common/meta/src/kv_backend/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ mod tests {
use crate::error::Error;
use crate::kv_backend::test::{
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, test_txn_compare_equal,
test_txn_compare_greater, test_txn_compare_less, test_txn_compare_not_equal,
test_txn_one_compare_op, text_txn_multi_compare_op,
};

async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
Expand Down Expand Up @@ -383,4 +385,15 @@ mod tests {

test_kv_batch_delete(kv_backend).await;
}

#[tokio::test]
async fn test_memory_txn() {
let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());
test_txn_one_compare_op(kv_backend.clone()).await;
text_txn_multi_compare_op(kv_backend.clone()).await;
test_txn_compare_equal(kv_backend.clone()).await;
test_txn_compare_greater(kv_backend.clone()).await;
test_txn_compare_less(kv_backend.clone()).await;
test_txn_compare_not_equal(kv_backend).await;
}
}
Loading
Loading