Skip to content

Commit

Permalink
[improve](cloud-mow) remove_delete_bitmap_update_lock rpc retry in sm…
Browse files Browse the repository at this point in the history
…all interval when TXN_CONFLICT
  • Loading branch information
mymeiyi committed Jan 16, 2025
1 parent fc1f683 commit a2d8dae
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 29 deletions.
4 changes: 4 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ CONF_Int32(txn_store_retry_times, "4");
CONF_Int32(txn_store_retry_base_intervals_ms, "500");
// Whether to retry the txn conflict errors that returns by the underlying txn store.
CONF_Bool(enable_retry_txn_conflict, "true");
// retry configs of remove_delete_bitmap_update_lock txn_conflict
CONF_Bool(delete_bitmap_enable_retry_txn_conflict, "true");
CONF_Int32(delete_bitmap_txn_conflict_retry_times, "4");
CONF_Int32(delete_bitmap_txn_conflict_retry_base_intervals_ms, "50");

CONF_mBool(enable_s3_rate_limiter, "false");
CONF_mInt64(s3_get_bucket_tokens, "1000000000000000000");
Expand Down
91 changes: 62 additions & 29 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2298,36 +2298,69 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock(
return;
}

RPC_RATE_LIMIT(remove_delete_bitmap_update_lock)
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
}
if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(),
request->lock_id(), request->initiator())) {
LOG(WARNING) << "failed to check delete bitmap tablet lock"
<< " table_id=" << request->table_id() << " tablet_id=" << request->tablet_id()
<< " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " msg " << msg;
return;
}
std::string lock_key =
meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1});
txn->remove(lock_key);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to remove delete bitmap tablet lock , err=" << err;
msg = ss.str();
return;
}
bool test = false;
int32_t retry_times = 0;
uint64_t duration_ms = 0, retry_drift_ms = 0;
while (true) {
response->Clear();
RPC_RATE_LIMIT(remove_delete_bitmap_update_lock)
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
}
if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(),
request->lock_id(), request->initiator())) {
LOG(WARNING) << "failed to check delete bitmap tablet lock"
<< " table_id=" << request->table_id()
<< " tablet_id=" << request->tablet_id()
<< " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " msg " << msg;
return;
}
std::string lock_key =
meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1});
txn->remove(lock_key);
TEST_SYNC_POINT_CALLBACK("remove_delete_bitmap_update_lock_err", &test, &retry_times, &err);
if (!test) {
err = txn->commit();
}
if (err == TxnErrorCode::TXN_OK) {
LOG(INFO) << "remove delete bitmap table lock table_id=" << request->table_id()
<< " tablet_id=" << request->tablet_id() << " lock_id=" << request->lock_id()
<< ", key=" << hex(lock_key) << ", initiator=" << request->initiator();
return;
} else if (err == TxnErrorCode::TXN_CONFLICT &&
config::delete_bitmap_enable_retry_txn_conflict &&
retry_times < config::delete_bitmap_txn_conflict_retry_times) {
if (retry_times == 0) {
// the first retry, add random drift.
duration seed = duration_cast<nanoseconds>(steady_clock::now().time_since_epoch());
std::default_random_engine rng(static_cast<uint64_t>(seed.count()));
retry_drift_ms = std::uniform_int_distribution<uint64_t>(
0, config::delete_bitmap_txn_conflict_retry_base_intervals_ms)(rng);
}

LOG(INFO) << "remove delete bitmap table lock table_id=" << request->table_id()
<< " tablet_id=" << request->tablet_id() << " lock_id=" << request->lock_id()
<< ", key=" << hex(lock_key) << ", initiator=" << request->initiator();
// 1 2 4 8 ...
duration_ms = (1 << retry_times) *
config::delete_bitmap_txn_conflict_retry_base_intervals_ms +
retry_drift_ms;
retry_times += 1;
LOG(WARNING) << __PRETTY_FUNCTION__ << " sleep " << duration_ms
<< " ms before next round, retry times left: "
<< (config::delete_bitmap_txn_conflict_retry_times - retry_times)
<< ", code: " << err << ", msg: " << response->status().msg();
bthread_usleep(duration_ms * 1000);
continue;
} else {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to remove delete bitmap tablet lock , err=" << err;
msg = ss.str();
return;
}
}
}

void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* controller,
Expand Down
99 changes: 99 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5911,6 +5911,105 @@ TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest3) {
SyncPoint::get_instance()->clear_all_call_backs();
}

TEST(MetaServiceTest, RemoveDeleteBitmapUpdateLockTest) {
auto meta_service = get_meta_service();
SyncPoint::get_instance()->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); });

// get delete bitmap update lock
brpc::Controller cntl;
GetDeleteBitmapUpdateLockRequest get_lock_req;
GetDeleteBitmapUpdateLockResponse get_lock_res;
get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
get_lock_req.set_table_id(100);
get_lock_req.add_partition_ids(123);
get_lock_req.set_expiration(5);
get_lock_req.set_lock_id(888);
get_lock_req.set_initiator(-1);
meta_service->get_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req,
&get_lock_res, nullptr);
ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);

RemoveDeleteBitmapUpdateLockRequest remove_lock_req;
remove_lock_req.set_cloud_unique_id("test_cloud_unique_id");
remove_lock_req.set_table_id(100);
remove_lock_req.set_lock_id(888);
remove_lock_req.set_initiator(-1);

{
// case1: remove lock: retry 4 times, code is KV_TXN_CONFLICT
SyncPoint::get_instance()->set_call_back(
"remove_delete_bitmap_update_lock_err", [&](auto&& args) {
auto* test = try_any_cast<bool*>(args[0]);
*test = true;
auto* retry_times = try_any_cast<int32_t*>(args[1]);
*try_any_cast<TxnErrorCode*>(args[2]) = TxnErrorCode::TXN_CONFLICT;
LOG(INFO) << "remove_delete_bitmap_update_lock_err 1, retry_times="
<< *retry_times << ", code=" << TxnErrorCode::TXN_CONFLICT;
});

RemoveDeleteBitmapUpdateLockResponse remove_lock_res;
meta_service->remove_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &remove_lock_req,
&remove_lock_res, nullptr);
ASSERT_EQ(remove_lock_res.status().code(), MetaServiceCode::KV_TXN_CONFLICT);
}

{
// case2: remove lock: retry 2 times, code is KV_TXN_CONFLICT;
// retry the third time, code is TXN_TOO_OLD
SyncPoint::get_instance()->set_call_back(
"remove_delete_bitmap_update_lock_err", [&](auto&& args) {
auto* test = try_any_cast<bool*>(args[0]);
*test = true;
auto* retry_times = try_any_cast<int32_t*>(args[1]);
*try_any_cast<TxnErrorCode*>(args[2]) = *retry_times < 2
? TxnErrorCode::TXN_CONFLICT
: TxnErrorCode::TXN_TOO_OLD;
LOG(INFO) << "remove_delete_bitmap_update_lock_err 2, retry_times="
<< *retry_times << ", code=" << *try_any_cast<TxnErrorCode*>(args[2]);
});
RemoveDeleteBitmapUpdateLockResponse remove_lock_res;
meta_service->remove_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &remove_lock_req,
&remove_lock_res, nullptr);
ASSERT_EQ(remove_lock_res.status().code(), MetaServiceCode::KV_TXN_TOO_OLD);
}

{
// case3: remove lock: retry 2 times, code is KV_TXN_CONFLICT;
// retry the third time, code is TXN_OK
SyncPoint::get_instance()->set_call_back(
"remove_delete_bitmap_update_lock_err", [&](auto&& args) {
auto* test = try_any_cast<bool*>(args[0]);
*test = true;
auto* retry_times = try_any_cast<int32_t*>(args[1]);
*try_any_cast<TxnErrorCode*>(args[2]) =
*retry_times < 2 ? TxnErrorCode::TXN_CONFLICT : TxnErrorCode::TXN_OK;
LOG(INFO) << "remove_delete_bitmap_update_lock_err 3, retry_times="
<< *retry_times << ", code=" << *try_any_cast<TxnErrorCode*>(args[2]);
});
RemoveDeleteBitmapUpdateLockResponse remove_lock_res;
meta_service->remove_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &remove_lock_req,
&remove_lock_res, nullptr);
ASSERT_EQ(remove_lock_res.status().code(), MetaServiceCode::OK);
}

{
// case4
LOG(INFO) << "remove_delete_bitmap_update_lock_err 4";
SyncPoint::get_instance()->clear_all_call_backs();
RemoveDeleteBitmapUpdateLockResponse remove_lock_res;
meta_service->remove_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &remove_lock_req,
&remove_lock_res, nullptr);
ASSERT_EQ(remove_lock_res.status().code(), MetaServiceCode::OK);
}
}

TEST(MetaServiceTest, GetVersion) {
auto service = get_meta_service();

Expand Down

0 comments on commit a2d8dae

Please sign in to comment.