Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UT] Fix lake compaction UT #48648

Merged
merged 3 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 86 additions & 31 deletions be/test/storage/lake/compaction_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@ class LakeCompactionTest : public TestBase, testing::WithParamInterface<Compacti
ASSERT_TRUE(dynamic_cast<VerticalCompactionTask*>(task.get()) != nullptr);
}
}

protected:
void SetUp() override {
config::enable_size_tiered_compaction_strategy = GetParam().enable_size_tiered_compaction_strategy;
kevincai marked this conversation as resolved.
Show resolved Hide resolved
config::vertical_compaction_max_columns_per_group = GetParam().vertical_compaction_max_columns_per_group;
config::min_cumulative_compaction_num_singleton_deltas = 1;
clear_and_init_test_dir();
}

void TearDown() override {
remove_test_dir_ignore_error();
config::enable_size_tiered_compaction_strategy = _enable_size_tiered_compaction_strategy;
config::vertical_compaction_max_columns_per_group = _vertical_compaction_max_columns_per_group;
config::min_cumulative_compaction_num_singleton_deltas = _min_cumulative_compaction_num_singleton_deltas;
}

private:
bool _enable_size_tiered_compaction_strategy = config::enable_size_tiered_compaction_strategy;
int64_t _vertical_compaction_max_columns_per_group = config::vertical_compaction_max_columns_per_group;
int64_t _min_cumulative_compaction_num_singleton_deltas = config::min_cumulative_compaction_num_singleton_deltas;
};

class LakeDuplicateKeyCompactionTest : public LakeCompactionTest {
Expand All @@ -67,13 +87,10 @@ class LakeDuplicateKeyCompactionTest : public LakeCompactionTest {
constexpr static const int kChunkSize = 12;

void SetUp() override {
config::enable_size_tiered_compaction_strategy = false;
clear_and_init_test_dir();
LakeCompactionTest::SetUp();
CHECK_OK(_tablet_mgr->put_tablet_metadata(*_tablet_metadata));
}

void TearDown() override { remove_test_dir_ignore_error(); }

Chunk generate_data(int64_t chunk_size) {
std::vector<int> v0(chunk_size);
std::vector<int> v1(chunk_size);
Expand Down Expand Up @@ -119,7 +136,6 @@ class LakeDuplicateKeyCompactionTest : public LakeCompactionTest {
};

TEST_P(LakeDuplicateKeyCompactionTest, test1) {
config::vertical_compaction_max_columns_per_group = GetParam().vertical_compaction_max_columns_per_group;
// Prepare data for writing
auto chunk0 = generate_data(kChunkSize);
auto indexes = std::vector<uint32_t>(kChunkSize);
Expand Down Expand Up @@ -160,16 +176,28 @@ TEST_P(LakeDuplicateKeyCompactionTest, test1) {
ASSERT_EQ(kChunkSize * 3, read(version));

ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
if (GetParam().enable_size_tiered_compaction_strategy) {
ASSERT_EQ(0, new_tablet_metadata->cumulative_point());
} else {
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
}
ASSERT_EQ(1, new_tablet_metadata->rowsets_size());
}

INSTANTIATE_TEST_SUITE_P(LakeDuplicateKeyCompactionTest, LakeDuplicateKeyCompactionTest,
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5},
CompactionParam{VERTICAL_COMPACTION, 1}),
::testing::Values(CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = false},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = false}),
to_string_param_name);

TEST_F(LakeDuplicateKeyCompactionTest, test_empty_tablet) {
TEST_P(LakeDuplicateKeyCompactionTest, test_empty_tablet) {
auto version = 1;
ASSERT_EQ(0, read(version));

Expand Down Expand Up @@ -197,12 +225,10 @@ class LakeDuplicateKeyOverlapSegmentsCompactionTest : public LakeCompactionTest
constexpr static const int kChunkSize = 12;

void SetUp() override {
clear_and_init_test_dir();
LakeCompactionTest::SetUp();
CHECK_OK(_tablet_mgr->put_tablet_metadata(*_tablet_metadata));
}

void TearDown() override { remove_test_dir_ignore_error(); }

Chunk generate_data(int64_t chunk_size) {
std::vector<int> v0(chunk_size);
std::vector<int> v1(chunk_size);
Expand Down Expand Up @@ -248,7 +274,6 @@ class LakeDuplicateKeyOverlapSegmentsCompactionTest : public LakeCompactionTest
};

TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
config::vertical_compaction_max_columns_per_group = GetParam().vertical_compaction_max_columns_per_group;
// Prepare data for writing
auto chunk0 = generate_data(kChunkSize);
auto indexes = std::vector<uint32_t>(kChunkSize);
Expand Down Expand Up @@ -289,7 +314,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
check_task(task);
auto st = task->execute(CompactionTask::kCancelledFn);
EXPECT_EQ(0, task_context->progress.value());
EXPECT_TRUE(st.is_cancelled()) << st;
EXPECT_TRUE(st.is_aborted()) << st;
}
// Completed compaction task without error
{
Expand All @@ -306,7 +331,11 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {

// check metadata
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
if (GetParam().enable_size_tiered_compaction_strategy) {
ASSERT_EQ(0, new_tablet_metadata->cumulative_point());
} else {
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
}
ASSERT_EQ(1, new_tablet_metadata->rowsets_size());
ASSERT_EQ(1, new_tablet_metadata->rowsets(0).segments_size());

Expand All @@ -330,8 +359,16 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
}

INSTANTIATE_TEST_SUITE_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, LakeDuplicateKeyOverlapSegmentsCompactionTest,
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5},
CompactionParam{VERTICAL_COMPACTION, 1}),
::testing::Values(CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = false},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = false}),
to_string_param_name);

class LakeUniqueKeyCompactionTest : public LakeCompactionTest {
Expand All @@ -347,12 +384,10 @@ class LakeUniqueKeyCompactionTest : public LakeCompactionTest {
constexpr static const int kChunkSize = 12;

void SetUp() override {
clear_and_init_test_dir();
LakeCompactionTest::SetUp();
CHECK_OK(_tablet_mgr->put_tablet_metadata(*_tablet_metadata));
}

void TearDown() override { remove_test_dir_ignore_error(); }

Chunk generate_data(int64_t chunk_size) {
std::vector<int> v0(chunk_size);
std::vector<int> v1(chunk_size);
Expand Down Expand Up @@ -398,7 +433,6 @@ class LakeUniqueKeyCompactionTest : public LakeCompactionTest {
};

TEST_P(LakeUniqueKeyCompactionTest, test1) {
config::vertical_compaction_max_columns_per_group = GetParam().vertical_compaction_max_columns_per_group;
// Prepare data for writing
auto chunk0 = generate_data(kChunkSize);
auto indexes = std::vector<uint32_t>(kChunkSize);
Expand Down Expand Up @@ -439,13 +473,25 @@ TEST_P(LakeUniqueKeyCompactionTest, test1) {
ASSERT_EQ(kChunkSize, read(version));

ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
if (GetParam().enable_size_tiered_compaction_strategy) {
ASSERT_EQ(0, new_tablet_metadata->cumulative_point());
} else {
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
}
ASSERT_EQ(1, new_tablet_metadata->rowsets_size());
}

INSTANTIATE_TEST_SUITE_P(LakeUniqueKeyCompactionTest, LakeUniqueKeyCompactionTest,
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5},
CompactionParam{VERTICAL_COMPACTION, 1}),
::testing::Values(CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = false},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = false}),
to_string_param_name);

class LakeUniqueKeyCompactionWithDeleteTest : public LakeCompactionTest {
Expand All @@ -461,12 +507,10 @@ class LakeUniqueKeyCompactionWithDeleteTest : public LakeCompactionTest {
constexpr static const int kChunkSize = 12;

void SetUp() override {
clear_and_init_test_dir();
LakeCompactionTest::SetUp();
CHECK_OK(_tablet_mgr->put_tablet_metadata(*_tablet_metadata));
}

void TearDown() override { remove_test_dir_ignore_error(); }

Chunk generate_data(int64_t chunk_size) {
std::vector<int> v0(chunk_size);
std::vector<int> v1(chunk_size);
Expand Down Expand Up @@ -512,7 +556,6 @@ class LakeUniqueKeyCompactionWithDeleteTest : public LakeCompactionTest {
};

TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete) {
config::vertical_compaction_max_columns_per_group = GetParam().vertical_compaction_max_columns_per_group;
// Prepare data for writing
auto chunk0 = generate_data(kChunkSize);
auto indexes = std::vector<uint32_t>(kChunkSize);
Expand Down Expand Up @@ -577,13 +620,25 @@ TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete)
ASSERT_EQ(kChunkSize - 4, read(version));

ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
if (GetParam().enable_size_tiered_compaction_strategy) {
ASSERT_EQ(0, new_tablet_metadata->cumulative_point());
} else {
ASSERT_EQ(1, new_tablet_metadata->cumulative_point());
}
ASSERT_EQ(1, new_tablet_metadata->rowsets_size());
}

INSTANTIATE_TEST_SUITE_P(LakeUniqueKeyCompactionWithDeleteTest, LakeUniqueKeyCompactionWithDeleteTest,
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5},
CompactionParam{VERTICAL_COMPACTION, 1}),
::testing::Values(CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = HORIZONTAL_COMPACTION,
.enable_size_tiered_compaction_strategy = false},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = true},
CompactionParam{.algorithm = VERTICAL_COMPACTION,
.vertical_compaction_max_columns_per_group = 1,
.enable_size_tiered_compaction_strategy = false}),
to_string_param_name);

} // namespace starrocks::lake
Expand Down
4 changes: 3 additions & 1 deletion be/test/storage/lake/compaction_test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ struct CompactionParam {
uint32_t vertical_compaction_max_columns_per_group = 5;
bool enable_persistent_index = false;
PersistentIndexTypePB persistent_index_type = PersistentIndexTypePB::LOCAL;
bool enable_size_tiered_compaction_strategy = true;
};

static std::string to_string_param_name(const testing::TestParamInfo<CompactionParam>& info) {
std::stringstream ss;
ss << CompactionUtils::compaction_algorithm_to_string(info.param.algorithm) << "_"
<< info.param.vertical_compaction_max_columns_per_group << "_" << info.param.enable_persistent_index << "_"
<< PersistentIndexTypePB_Name(info.param.persistent_index_type);
<< PersistentIndexTypePB_Name(info.param.persistent_index_type) << "_"
<< info.param.enable_size_tiered_compaction_strategy;
return ss.str();
}

Expand Down
Loading