From e33075a7043e8feb0721757735f2eeccf72a6d70 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 22 Dec 2024 16:03:41 +0800 Subject: [PATCH] try revert gtest to fix ci Signed-off-by: guo-shaoge --- .../tests/gtest_aggregation_executor.cpp | 136 +++++++----------- 1 file changed, 50 insertions(+), 86 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index e407fcae764..7193f24eddb 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -24,7 +24,6 @@ namespace DB namespace FailPoints { extern const char force_agg_on_partial_block[]; -extern const char force_agg_prefetch[]; extern const char force_agg_two_level_hash_table_before_merge[]; } // namespace FailPoints namespace tests @@ -239,22 +238,16 @@ class AggExecutorTestRunner : public ExecutorTest ColumnWithUInt64 col_pr{1, 2, 0, 3290124, 968933, 3125, 31236, 4327, 80000}; }; -#define WRAP_FOR_AGG_FAILPOINTS_START \ - std::vector enables{true, false}; \ - for (auto enable : enables) \ - { \ - if (enable) \ - { \ - FailPointHelper::enableFailPoint(FailPoints::force_agg_on_partial_block); \ - FailPointHelper::enableFailPoint(FailPoints::force_agg_prefetch); \ - } \ - else \ - { \ - FailPointHelper::disableFailPoint(FailPoints::force_agg_on_partial_block); \ - FailPointHelper::disableFailPoint(FailPoints::force_agg_prefetch); \ - } +#define WRAP_FOR_AGG_PARTIAL_BLOCK_START \ + std::vector partial_blocks{true, false}; \ + for (auto partial_block : partial_blocks) \ + { \ + if (partial_block) \ + FailPointHelper::enableFailPoint(FailPoints::force_agg_on_partial_block); \ + else \ + FailPointHelper::disableFailPoint(FailPoints::force_agg_on_partial_block); -#define WRAP_FOR_AGG_FAILPOINTS_END } +#define WRAP_FOR_AGG_PARTIAL_BLOCK_END } /// Guarantee the correctness of group by TEST_F(AggExecutorTestRunner, GroupBy) @@ -370,9 +363,9 @@ try FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); else FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } } @@ -436,9 +429,9 @@ try FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); else FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } } @@ -471,9 +464,9 @@ try for (size_t i = 0; i < test_num; ++i) { request = buildDAGRequest(std::make_pair(db_name, table_name), agg_funcs[i], group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } /// Min function tests @@ -492,9 +485,9 @@ try for (size_t i = 0; i < test_num; ++i) { request = buildDAGRequest(std::make_pair(db_name, table_name), agg_funcs[i], group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } CATCH @@ -552,9 +545,9 @@ try { request = buildDAGRequest(std::make_pair(db_name, table_name), {agg_funcs[i]}, group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } CATCH @@ -622,9 +615,9 @@ try {agg_func}, group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } { @@ -636,9 +629,9 @@ try {agg_func}, group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } for (auto collation_id : {0, static_cast(TiDB::ITiDBCollator::BINARY)}) @@ -675,9 +668,9 @@ try {agg_func}, group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } } @@ -690,9 +683,9 @@ try executeAndAssertColumnsEqual(request, {{toNullableVec({"banana"})}}); request = context.scan("aggnull_test", "t1").aggregation({}, {col("s1")}).build(context); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, {{toNullableVec("s1", {{}, "banana"})}}); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } CATCH @@ -704,9 +697,9 @@ try = {toNullableVec({3}), toNullableVec({1}), toVec({6})}; auto test_single_function = [&](size_t index) { auto request = context.scan("test_db", "test_table").aggregation({functions[index]}, {}).build(context); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, {functions_result[index]}); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END }; for (size_t i = 0; i < functions.size(); ++i) test_single_function(i); @@ -727,9 +720,9 @@ try results.push_back(functions_result[k]); auto request = context.scan("test_db", "test_table").aggregation(funcs, {}).build(context); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, results); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END funcs.pop_back(); results.pop_back(); @@ -765,9 +758,9 @@ try context.context->setSetting( "group_by_two_level_threshold", Field(static_cast(two_level_threshold))); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, expect); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } } @@ -798,7 +791,7 @@ try "group_by_two_level_threshold", Field(static_cast(two_level_threshold))); context.context->setSetting("max_block_size", Field(static_cast(block_size))); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START auto blocks = getExecuteStreamsReturnBlocks(request, concurrency); size_t actual_row = 0; for (auto & block : blocks) @@ -807,7 +800,7 @@ try actual_row += block.rows(); } ASSERT_EQ(actual_row, expect_rows[i]); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } } @@ -921,7 +914,7 @@ try "group_by_two_level_threshold", Field(static_cast(two_level_threshold))); context.context->setSetting("max_block_size", Field(static_cast(block_size))); - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START auto blocks = getExecuteStreamsReturnBlocks(request, concurrency); for (auto & block : blocks) { @@ -946,7 +939,7 @@ try vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(), false)); } - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } } @@ -974,18 +967,18 @@ try request = context.receive("empty_recv", 5).aggregation({Max(col("s1"))}, {col("s2")}, 5).build(context); { - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, {}); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } request = context.scan("test_db", "empty_table") .aggregation({Count(lit(Field(static_cast(1))))}, {}) .build(context); { - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(request, {toVec({0})}); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } CATCH @@ -1042,31 +1035,6 @@ try toVec("col_tinyint", col_data_tinyint), }); - std::random_device rd; - std::mt19937_64 gen(rd()); - - std::vector max_block_sizes{1, 2, DEFAULT_BLOCK_SIZE}; - std::vector two_level_thresholds{0, 1}; - - std::uniform_int_distribution dist(0, max_block_sizes.size()); - size_t random_block_size = max_block_sizes[dist(gen)]; - - std::uniform_int_distribution dist1(0, two_level_thresholds.size()); - size_t random_two_level_threshold = two_level_thresholds[dist1(gen)]; - LOG_DEBUG( - Logger::get("AggExecutorTestRunner::AggKeyOptimization"), - "max_block_size: {}, two_level_threshold: {}", - random_block_size, - random_two_level_threshold); - - context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast(0))); -#define WRAP_FOR_AGG_CHANGE_SETTINGS \ - context.context->setSetting( \ - "group_by_two_level_threshold", \ - Field(static_cast(random_two_level_threshold))); \ - context.context->setSetting("max_block_size", Field(static_cast(random_block_size))); - - FailPointHelper::enableFailPoint(FailPoints::force_agg_prefetch); { // case-1: select count(1), col_tinyint from t group by col_int, col_tinyint // agg method: keys64(AggregationMethodKeysFixed) @@ -1081,7 +1049,6 @@ try toNullableVec("first_row(col_tinyint)", ColumnWithNullableInt8{0, 1, 2, 3}), toVec("col_int", ColumnWithInt32{0, 1, 2, 3}), toVec("col_tinyint", ColumnWithInt8{0, 1, 2, 3})}; - WRAP_FOR_AGG_CHANGE_SETTINGS executeAndAssertColumnsEqual(request, expected); } @@ -1098,7 +1065,6 @@ try = {toVec("count(1)", ColumnWithUInt64{rows_per_type, rows_per_type, rows_per_type, rows_per_type}), toNullableVec("first_row(col_int)", ColumnWithNullableInt32{0, 1, 2, 3}), toVec("col_int", ColumnWithInt32{0, 1, 2, 3})}; - WRAP_FOR_AGG_CHANGE_SETTINGS executeAndAssertColumnsEqual(request, expected); } @@ -1133,7 +1099,6 @@ try toNullableVec("first_row(col_string_with_collator)", ColumnWithNullableString{"a", "b", "c", "d"}), toVec("col_string_with_collator", ColumnWithString{"a", "b", "c", "d"}), }; - WRAP_FOR_AGG_CHANGE_SETTINGS executeAndAssertColumnsEqual(request, expected); } @@ -1151,7 +1116,6 @@ try toVec("count(1)", ColumnWithUInt64{rows_per_type, rows_per_type, rows_per_type, rows_per_type}), toVec("first_row(col_string_with_collator)", ColumnWithString{"a", "b", "c", "d"}), }; - WRAP_FOR_AGG_CHANGE_SETTINGS executeAndAssertColumnsEqual(request, expected); } @@ -1174,7 +1138,6 @@ try toVec("col_int", ColumnWithInt32{0, 1, 2, 3}), toVec("col_string_no_collator", ColumnWithString{"a", "b", "c", "d"}), }; - WRAP_FOR_AGG_CHANGE_SETTINGS executeAndAssertColumnsEqual(request, expected); } @@ -1192,11 +1155,8 @@ try toNullableVec("first_row(col_string_with_collator)", ColumnWithNullableString{"a", "b", "c", "d"}), toVec("col_string_with_collator", ColumnWithString{"a", "b", "c", "d"}), toVec("col_int", ColumnWithInt32{0, 1, 2, 3})}; - WRAP_FOR_AGG_CHANGE_SETTINGS executeAndAssertColumnsEqual(request, expected); } - FailPointHelper::disableFailPoint(FailPoints::force_agg_prefetch); -#undef WRAP_FOR_AGG_CHANGE_SETTINGS } CATCH @@ -1227,9 +1187,13 @@ try context .addExchangeReceiver("exchange_receiver_1_concurrency", column_infos, column_data, 1, partition_column_infos); + context + .addExchangeReceiver("exchange_receiver_3_concurrency", column_infos, column_data, 3, partition_column_infos); + context + .addExchangeReceiver("exchange_receiver_5_concurrency", column_infos, column_data, 5, partition_column_infos); context .addExchangeReceiver("exchange_receiver_10_concurrency", column_infos, column_data, 10, partition_column_infos); - std::vector exchange_receiver_concurrency = {1, 10}; + std::vector exchange_receiver_concurrency = {1, 3, 5, 10}; auto gen_request = [&](size_t exchange_concurrency) { return context @@ -1241,15 +1205,15 @@ try auto baseline = executeStreams(gen_request(1), 1); for (size_t exchange_concurrency : exchange_receiver_concurrency) { - WRAP_FOR_AGG_FAILPOINTS_START + WRAP_FOR_AGG_PARTIAL_BLOCK_START executeAndAssertColumnsEqual(gen_request(exchange_concurrency), baseline); - WRAP_FOR_AGG_FAILPOINTS_END + WRAP_FOR_AGG_PARTIAL_BLOCK_END } } CATCH -#undef WRAP_FOR_AGG_FAILPOINTS_START -#undef WRAP_FOR_AGG_FAILPOINTS_END +#undef WRAP_FOR_AGG_PARTIAL_BLOCK_START +#undef WRAP_FOR_AGG_PARTIAL_BLOCK_END } // namespace tests } // namespace DB