Skip to content

Commit

Permalink
[fix](recycler) fix idx file leak in recycler (#44908)
Browse files Browse the repository at this point in the history
Related PR: #38306

Problem Summary:
Non inverted index storage format for old version will cause leaking
recycling idx files.
  • Loading branch information
airborne12 authored Dec 4, 2024
1 parent 1aa60f5 commit 170b5d3
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 11 deletions.
28 changes: 17 additions & 11 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,12 @@ class InstanceRecycler::InvertedIndexIdCache {
LOG(WARNING) << "malformed schema value, key=" << hex(schema_key);
return -1;
}
if (schema.index_size() > 0 && schema.has_inverted_index_storage_format()) {
res.first = schema.inverted_index_storage_format();
if (schema.index_size() > 0) {
InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V1;
if (schema.has_inverted_index_storage_format()) {
index_format = schema.inverted_index_storage_format();
}
res.first = index_format;
res.second.reserve(schema.index_size());
for (auto& i : schema.index()) {
if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
Expand Down Expand Up @@ -1382,17 +1386,19 @@ int InstanceRecycler::delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta
}
std::vector<std::string> file_paths;
auto tablet_schema = rs_meta_pb.tablet_schema();
auto index_storage_format = InvertedIndexStorageFormatPB::V1;
for (int64_t i = 0; i < num_segments; ++i) {
file_paths.push_back(segment_path(tablet_id, rowset_id, i));
if (tablet_schema.has_inverted_index_storage_format()) {
if (tablet_schema.inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
for (const auto& index_id : index_ids) {
file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i,
index_id.first, index_id.second));
}
} else if (!index_ids.empty()) {
file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i));
index_storage_format = tablet_schema.inverted_index_storage_format();
}
if (index_storage_format == InvertedIndexStorageFormatPB::V1) {
for (const auto& index_id : index_ids) {
file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i, index_id.first,
index_id.second));
}
} else if (!index_ids.empty()) {
file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i));
}
}
// TODO(AlexYue): seems could do do batch
Expand Down Expand Up @@ -1429,8 +1435,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou

// Process inverted indexes
std::vector<std::pair<int64_t, std::string>> index_ids;
// default format as v2.
InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V2;
// default format as v1.
InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V1;

if (rs.has_tablet_schema()) {
for (const auto& index : rs.tablet_schema().index()) {
Expand Down
105 changes: 105 additions & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3077,4 +3077,109 @@ TEST(RecyclerTest, delete_rowset_data) {
}
}

TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);

InstanceInfoPB instance;
instance.set_instance_id(instance_id);
auto obj_info = instance.add_obj_info();
obj_info->set_id("recycle_tmp_rowsets");
obj_info->set_ak(config::test_s3_ak);
obj_info->set_sk(config::test_s3_sk);
obj_info->set_endpoint(config::test_s3_endpoint);
obj_info->set_region(config::test_s3_region);
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("recycle_tmp_rowsets");

std::vector<doris::TabletSchemaCloudPB> schemas;
for (int i = 0; i < 5; ++i) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
//schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
for (int j = 0; j < i; ++j) {
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

{
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
int64_t txn_id_base = 114115;
int64_t tablet_id_base = 10015;
int64_t index_id_base = 1000;
// Delete each rowset directly using one RowsetPB
for (int i = 0; i < 100; ++i) {
int64_t txn_id = txn_id_base + i;
for (int j = 0; j < 20; ++j) {
auto rowset = create_rowset("recycle_tmp_rowsets", tablet_id_base + j,
index_id_base + j % 4, 5, schemas[i % 5], txn_id);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, i & 1);
ASSERT_EQ(0, recycler.delete_rowset_data(rowset));
}
}

std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
}
{
InstanceInfoPB tmp_instance;
std::string resource_id = "recycle_tmp_rowsets";
tmp_instance.set_instance_id(instance_id);
auto tmp_obj_info = tmp_instance.add_obj_info();
tmp_obj_info->set_id(resource_id);
tmp_obj_info->set_ak(config::test_s3_ak);
tmp_obj_info->set_sk(config::test_s3_sk);
tmp_obj_info->set_endpoint(config::test_s3_endpoint);
tmp_obj_info->set_region(config::test_s3_region);
tmp_obj_info->set_bucket(config::test_s3_bucket);
tmp_obj_info->set_prefix(resource_id);

InstanceRecycler recycler(txn_kv, tmp_instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
// Delete multiple rowset files using one series of RowsetPB
constexpr int index_id = 10001, tablet_id = 10002;
std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
for (int i = 0; i < 10; ++i) {
auto rowset = create_rowset(resource_id, tablet_id, index_id, 5, schemas[i % 5]);
create_recycle_rowset(
txn_kv.get(), accessor.get(), rowset,
static_cast<RecycleRowsetPB::Type>(i % (RecycleRowsetPB::Type_MAX + 1)), true);

rowset_pbs.emplace_back(std::move(rowset));
}
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
}
{
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
// Delete multiple rowset files using one series of RowsetPB
constexpr int index_id = 20001, tablet_id = 20002;
// Delete each rowset file directly using it's id to construct one path
for (int i = 0; i < 1000; ++i) {
auto rowset =
create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5, schemas[i % 5]);
create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, RecycleRowsetPB::COMPACT,
true);
ASSERT_EQ(0, recycler.delete_rowset_data(rowset.resource_id(), rowset.tablet_id(),
rowset.rowset_id_v2()));
}
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
}
}

} // namespace doris::cloud

0 comments on commit 170b5d3

Please sign in to comment.