Skip to content

Commit

Permalink
[fix](cloud-mow) MS should create new Transaction to continue geting …
Browse files Browse the repository at this point in the history
…delete bitmap when encounter TXN_TOO_OLD (#43509)

When delete bitmap count is big, geting delete bitmap may encounter
TXN_TOO_OLD, ms should create a new transaction to reading the remaining
data instead of returning TXN_TOO_OLD code.
  • Loading branch information
hust-hhb authored Nov 13, 2024
1 parent 900bf91 commit 00e5ab8
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ CONF_Validator(s3_client_http_scheme, [](const std::string& config) -> bool {
// Max retry times for object storage request
CONF_mInt64(max_s3_client_retry, "10");

// Max byte getting delete bitmap can return, default is 1GB
CONF_mInt64(max_get_delete_bitmap_byte, "1073741824");

CONF_Bool(enable_cloud_txn_lazy_commit, "true");
CONF_Int32(txn_lazy_commit_rowsets_thresold, "1000");
CONF_Int32(txn_lazy_commit_num_threads, "8");
Expand Down
64 changes: 60 additions & 4 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,12 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
return;
}

response->set_tablet_id(tablet_id);
int64_t delete_bitmap_num = 0;
int64_t delete_bitmap_byte = 0;
bool test = false;
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_test", &test);

for (size_t i = 0; i < rowset_ids.size(); i++) {
// create a new transaction every time, avoid using one transaction that takes too long
std::unique_ptr<Transaction> txn;
Expand All @@ -1931,11 +1937,40 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
std::unique_ptr<RangeGetIterator> it;
int64_t last_ver = -1;
int64_t last_seg_id = -1;
int64_t round = 0;
do {
err = txn->get(start_key, end_key, &it);
if (test) {
LOG(INFO) << "test";
err = txn->get(start_key, end_key, &it, false, 2);
} else {
err = txn->get(start_key, end_key, &it);
}
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_err", &round, &err);
int64_t retry = 0;
while (err == TxnErrorCode::TXN_TOO_OLD && retry < 3) {
txn = nullptr;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to init txn, retry=" << retry << ", internal round=" << round;
msg = ss.str();
return;
}
if (test) {
err = txn->get(start_key, end_key, &it, false, 2);
} else {
err = txn->get(start_key, end_key, &it);
}
retry++;
LOG(INFO) << "retry get delete bitmap, tablet=" << tablet_id << ", retry=" << retry
<< ", internal round=" << round
<< ", delete_bitmap_num=" << delete_bitmap_num
<< ", delete_bitmap_byte=" << delete_bitmap_byte;
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "internal error, failed to get delete bitmap, ret=" << err;
ss << "internal error, failed to get delete bitmap, internal round=" << round
<< ", ret=" << err;
msg = ss.str();
return;
}
Expand All @@ -1960,18 +1995,39 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
response->add_segment_delete_bitmaps(std::string(v));
last_ver = ver;
last_seg_id = seg_id;
delete_bitmap_num++;
delete_bitmap_byte += v.length();
} else {
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_code", &code);
if (code != MetaServiceCode::OK) {
msg = "test get get_delete_bitmap fail,code=" + MetaServiceCode_Name(code);
ss << "test get get_delete_bitmap fail, code=" << MetaServiceCode_Name(code)
<< ", internal round=" << round;
msg = ss.str();
return;
}
delete_bitmap_byte += v.length();
response->mutable_segment_delete_bitmaps()->rbegin()->append(v);
}
}
if (delete_bitmap_byte > config::max_get_delete_bitmap_byte) {
code = MetaServiceCode::KV_TXN_GET_ERR;
ss << "tablet=" << tablet_id << ", get_delete_bitmap_byte=" << delete_bitmap_byte
<< ",exceed max byte";
msg = ss.str();
LOG(WARNING) << msg;
return;
}
round++;
start_key = it->next_begin_key(); // Update to next smallest key for iteration
} while (it->more());
}
LOG(INFO) << "get delete bitmap for tablet=" << tablet_id << ", rowset=" << rowset_ids[i]
<< ", start version=" << begin_versions[i] << ", end version=" << end_versions[i]
<< ", internal round=" << round << ", delete_bitmap_num=" << delete_bitmap_num
<< ", delete_bitmap_byte=" << delete_bitmap_byte;
}
LOG(INFO) << "finish get delete bitmap for tablet=" << tablet_id
<< ", delete_bitmap_num=" << delete_bitmap_num
<< ", delete_bitmap_byte=" << delete_bitmap_byte;

if (request->has_idx()) {
std::unique_ptr<Transaction> txn;
Expand Down
3 changes: 2 additions & 1 deletion cloud/src/meta-service/meta_service_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re
}
LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
<< " status=" << res->status().ShortDebugString()
<< " delete_bitmap_size=" << res->segment_delete_bitmaps_size();
<< " tablet=" << res->tablet_id()
<< " delete_bitmap_count=" << res->segment_delete_bitmaps_size();
} else if constexpr (std::is_same_v<Response, GetObjStoreInfoResponse> ||
std::is_same_v<Response, GetStageResponse>) {
std::string debug_string = res->DebugString();
Expand Down
167 changes: 166 additions & 1 deletion cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5140,7 +5140,7 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
}
}

TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest) {
TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest1) {
auto meta_service = get_meta_service();
SyncPoint::get_instance()->enable_processing();
size_t index = 0;
Expand Down Expand Up @@ -5211,6 +5211,171 @@ TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest) {
SyncPoint::get_instance()->clear_all_call_backs();
}

TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest2) {
auto meta_service = get_meta_service();
SyncPoint::get_instance()->enable_processing();
size_t index = 0;
SyncPoint::get_instance()->set_call_back("get_delete_bitmap_test", [&](auto&& args) {
auto* test = try_any_cast<bool*>(args[0]);
*test = true;
LOG(INFO) << "GET_DELETE_BITMAP_TEST, test=" << *test;
});
SyncPoint::get_instance()->set_call_back("get_delete_bitmap_err", [&](auto&& args) {
auto* round = try_any_cast<int64_t*>(args[0]);
LOG(INFO) << "GET_DELETE_BITMAP_CODE,index=" << index << ",round=" << *round;
if (*round > 2 && ++index < 2) {
*try_any_cast<TxnErrorCode*>(args[1]) = TxnErrorCode::TXN_TOO_OLD;
}
});

// get delete bitmap update lock
brpc::Controller cntl;
GetDeleteBitmapUpdateLockRequest get_lock_req;
GetDeleteBitmapUpdateLockResponse get_lock_res;
get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
get_lock_req.set_table_id(100);
get_lock_req.add_partition_ids(123);
get_lock_req.set_expiration(5);
get_lock_req.set_lock_id(888);
get_lock_req.set_initiator(-1);
meta_service->get_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req,
&get_lock_res, nullptr);
ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);

//first update new key
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
update_delete_bitmap_req.set_table_id(100);
update_delete_bitmap_req.set_partition_id(123);
update_delete_bitmap_req.set_lock_id(888);
update_delete_bitmap_req.set_initiator(-1);
update_delete_bitmap_req.set_tablet_id(333);
std::string rowset_id = "456";
std::string segment_delete_bitmaps[5];
for (int i = 0; i < 5; i++) {
segment_delete_bitmaps[i] = generate_random_string(300 * 1000 * 3);
}
int count = 5;
for (int i = 0; i < count; i++) {
update_delete_bitmap_req.add_rowset_ids(rowset_id);
update_delete_bitmap_req.add_segment_ids(i);
update_delete_bitmap_req.add_versions(i + 1);
update_delete_bitmap_req.add_segment_delete_bitmaps(segment_delete_bitmaps[i]);
}
meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&update_delete_bitmap_req, &update_delete_bitmap_res,
nullptr);
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);

GetDeleteBitmapRequest get_delete_bitmap_req;
GetDeleteBitmapResponse get_delete_bitmap_res;
get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
get_delete_bitmap_req.set_tablet_id(333);

get_delete_bitmap_req.add_rowset_ids(rowset_id);
get_delete_bitmap_req.add_begin_versions(1);
get_delete_bitmap_req.add_end_versions(count);

meta_service->get_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&get_delete_bitmap_req, &get_delete_bitmap_res, nullptr);
ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), count);
ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), count);
ASSERT_EQ(get_delete_bitmap_res.versions_size(), count);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), count);

for (int i = 0; i < count; i++) {
ASSERT_EQ(get_delete_bitmap_res.rowset_ids(i), rowset_id);
ASSERT_EQ(get_delete_bitmap_res.segment_ids(i), i);
ASSERT_EQ(get_delete_bitmap_res.versions(i), i + 1);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(i), segment_delete_bitmaps[i]);
}
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
}

TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest3) {
auto meta_service = get_meta_service();
SyncPoint::get_instance()->enable_processing();
size_t index = 0;
SyncPoint::get_instance()->set_call_back("get_delete_bitmap_err", [&](auto&& args) {
auto* round = try_any_cast<int64_t*>(args[0]);
LOG(INFO) << "GET_DELETE_BITMAP_CODE,index=" << index << ",round=" << *round;
if (*round > 2 && ++index < 2) {
*try_any_cast<TxnErrorCode*>(args[1]) = TxnErrorCode::TXN_TOO_OLD;
}
});

// get delete bitmap update lock
brpc::Controller cntl;
GetDeleteBitmapUpdateLockRequest get_lock_req;
GetDeleteBitmapUpdateLockResponse get_lock_res;
get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
get_lock_req.set_table_id(100);
get_lock_req.add_partition_ids(123);
get_lock_req.set_expiration(5);
get_lock_req.set_lock_id(888);
get_lock_req.set_initiator(-1);
meta_service->get_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req,
&get_lock_res, nullptr);
ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);

//first update new key
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
update_delete_bitmap_req.set_table_id(100);
update_delete_bitmap_req.set_partition_id(123);
update_delete_bitmap_req.set_lock_id(888);
update_delete_bitmap_req.set_initiator(-1);
update_delete_bitmap_req.set_tablet_id(333);
std::string rowset_id = "456";
std::string segment_delete_bitmaps[5];
for (int i = 0; i < 5; i++) {
segment_delete_bitmaps[i] = generate_random_string(300 * 1000 * 3);
}
int count = 5;
for (int i = 0; i < count; i++) {
update_delete_bitmap_req.add_rowset_ids(rowset_id);
update_delete_bitmap_req.add_segment_ids(i);
update_delete_bitmap_req.add_versions(i + 1);
update_delete_bitmap_req.add_segment_delete_bitmaps(segment_delete_bitmaps[i]);
}
meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&update_delete_bitmap_req, &update_delete_bitmap_res,
nullptr);
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);

GetDeleteBitmapRequest get_delete_bitmap_req;
GetDeleteBitmapResponse get_delete_bitmap_res;
get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
get_delete_bitmap_req.set_tablet_id(333);

get_delete_bitmap_req.add_rowset_ids(rowset_id);
get_delete_bitmap_req.add_begin_versions(1);
get_delete_bitmap_req.add_end_versions(count);

meta_service->get_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&get_delete_bitmap_req, &get_delete_bitmap_res, nullptr);
ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), count);
ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), count);
ASSERT_EQ(get_delete_bitmap_res.versions_size(), count);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), count);

for (int i = 0; i < count; i++) {
ASSERT_EQ(get_delete_bitmap_res.rowset_ids(i), rowset_id);
ASSERT_EQ(get_delete_bitmap_res.segment_ids(i), i);
ASSERT_EQ(get_delete_bitmap_res.versions(i), i + 1);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(i), segment_delete_bitmaps[i]);
}
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
}

TEST(MetaServiceTest, GetVersion) {
auto service = get_meta_service();

Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@ message GetDeleteBitmapResponse {
repeated int64 versions = 4;
// Serialized roaring bitmaps indexed with {rowset_id, segment_id, version}
repeated bytes segment_delete_bitmaps = 5;
optional int64 tablet_id = 6;
}

message RemoveDeleteBitmapRequest {
Expand Down

0 comments on commit 00e5ab8

Please sign in to comment.