Skip to content

Commit

Permalink
try revert gtest to fix ci
Browse files Browse the repository at this point in the history
Signed-off-by: guo-shaoge <[email protected]>
  • Loading branch information
guo-shaoge committed Dec 22, 2024
1 parent 35d461b commit e33075a
Showing 1 changed file with 50 additions and 86 deletions.
136 changes: 50 additions & 86 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ namespace DB
namespace FailPoints
{
extern const char force_agg_on_partial_block[];
extern const char force_agg_prefetch[];
extern const char force_agg_two_level_hash_table_before_merge[];
} // namespace FailPoints
namespace tests
Expand Down Expand Up @@ -239,22 +238,16 @@ class AggExecutorTestRunner : public ExecutorTest
ColumnWithUInt64 col_pr{1, 2, 0, 3290124, 968933, 3125, 31236, 4327, 80000};
};

#define WRAP_FOR_AGG_FAILPOINTS_START \
std::vector<bool> enables{true, false}; \
for (auto enable : enables) \
{ \
if (enable) \
{ \
FailPointHelper::enableFailPoint(FailPoints::force_agg_on_partial_block); \
FailPointHelper::enableFailPoint(FailPoints::force_agg_prefetch); \
} \
else \
{ \
FailPointHelper::disableFailPoint(FailPoints::force_agg_on_partial_block); \
FailPointHelper::disableFailPoint(FailPoints::force_agg_prefetch); \
}
#define WRAP_FOR_AGG_PARTIAL_BLOCK_START \
std::vector<bool> partial_blocks{true, false}; \
for (auto partial_block : partial_blocks) \
{ \
if (partial_block) \
FailPointHelper::enableFailPoint(FailPoints::force_agg_on_partial_block); \
else \
FailPointHelper::disableFailPoint(FailPoints::force_agg_on_partial_block);

#define WRAP_FOR_AGG_FAILPOINTS_END }
#define WRAP_FOR_AGG_PARTIAL_BLOCK_END }

/// Guarantee the correctness of group by
TEST_F(AggExecutorTestRunner, GroupBy)
Expand Down Expand Up @@ -370,9 +363,9 @@ try
FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
else
FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}
Expand Down Expand Up @@ -436,9 +429,9 @@ try
FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
else
FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}
Expand Down Expand Up @@ -471,9 +464,9 @@ try
for (size_t i = 0; i < test_num; ++i)
{
request = buildDAGRequest(std::make_pair(db_name, table_name), agg_funcs[i], group_by_exprs[i], projections[i]);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}

/// Min function tests
Expand All @@ -492,9 +485,9 @@ try
for (size_t i = 0; i < test_num; ++i)
{
request = buildDAGRequest(std::make_pair(db_name, table_name), agg_funcs[i], group_by_exprs[i], projections[i]);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
CATCH
Expand Down Expand Up @@ -552,9 +545,9 @@ try
{
request
= buildDAGRequest(std::make_pair(db_name, table_name), {agg_funcs[i]}, group_by_exprs[i], projections[i]);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
CATCH
Expand Down Expand Up @@ -622,9 +615,9 @@ try
{agg_func},
group_by_exprs[i],
projections[i]);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
{
Expand All @@ -636,9 +629,9 @@ try
{agg_func},
group_by_exprs[i],
projections[i]);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
for (auto collation_id : {0, static_cast<int>(TiDB::ITiDBCollator::BINARY)})
Expand Down Expand Up @@ -675,9 +668,9 @@ try
{agg_func},
group_by_exprs[i],
projections[i]);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}
Expand All @@ -690,9 +683,9 @@ try
executeAndAssertColumnsEqual(request, {{toNullableVec<String>({"banana"})}});

request = context.scan("aggnull_test", "t1").aggregation({}, {col("s1")}).build(context);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, {{toNullableVec<String>("s1", {{}, "banana"})}});
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
CATCH

Expand All @@ -704,9 +697,9 @@ try
= {toNullableVec<Int64>({3}), toNullableVec<Int64>({1}), toVec<UInt64>({6})};
auto test_single_function = [&](size_t index) {
auto request = context.scan("test_db", "test_table").aggregation({functions[index]}, {}).build(context);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, {functions_result[index]});
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
};
for (size_t i = 0; i < functions.size(); ++i)
test_single_function(i);
Expand All @@ -727,9 +720,9 @@ try
results.push_back(functions_result[k]);

auto request = context.scan("test_db", "test_table").aggregation(funcs, {}).build(context);
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, results);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END

funcs.pop_back();
results.pop_back();
Expand Down Expand Up @@ -765,9 +758,9 @@ try
context.context->setSetting(
"group_by_two_level_threshold",
Field(static_cast<UInt64>(two_level_threshold)));
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}
Expand Down Expand Up @@ -798,7 +791,7 @@ try
"group_by_two_level_threshold",
Field(static_cast<UInt64>(two_level_threshold)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(block_size)));
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
size_t actual_row = 0;
for (auto & block : blocks)
Expand All @@ -807,7 +800,7 @@ try
actual_row += block.rows();
}
ASSERT_EQ(actual_row, expect_rows[i]);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}
Expand Down Expand Up @@ -921,7 +914,7 @@ try
"group_by_two_level_threshold",
Field(static_cast<UInt64>(two_level_threshold)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(block_size)));
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
Expand All @@ -946,7 +939,7 @@ try
vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(),
false));
}
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}
Expand Down Expand Up @@ -974,18 +967,18 @@ try

request = context.receive("empty_recv", 5).aggregation({Max(col("s1"))}, {col("s2")}, 5).build(context);
{
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, {});
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}

request = context.scan("test_db", "empty_table")
.aggregation({Count(lit(Field(static_cast<UInt64>(1))))}, {})
.build(context);
{
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, {toVec<UInt64>({0})});
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
CATCH
Expand Down Expand Up @@ -1042,31 +1035,6 @@ try
toVec<Int8>("col_tinyint", col_data_tinyint),
});

std::random_device rd;
std::mt19937_64 gen(rd());

std::vector<size_t> max_block_sizes{1, 2, DEFAULT_BLOCK_SIZE};
std::vector<UInt64> two_level_thresholds{0, 1};

std::uniform_int_distribution<size_t> dist(0, max_block_sizes.size());
size_t random_block_size = max_block_sizes[dist(gen)];

std::uniform_int_distribution<size_t> dist1(0, two_level_thresholds.size());
size_t random_two_level_threshold = two_level_thresholds[dist1(gen)];
LOG_DEBUG(
Logger::get("AggExecutorTestRunner::AggKeyOptimization"),
"max_block_size: {}, two_level_threshold: {}",
random_block_size,
random_two_level_threshold);

context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
#define WRAP_FOR_AGG_CHANGE_SETTINGS \
context.context->setSetting( \
"group_by_two_level_threshold", \
Field(static_cast<UInt64>(random_two_level_threshold))); \
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(random_block_size)));

FailPointHelper::enableFailPoint(FailPoints::force_agg_prefetch);
{
// case-1: select count(1), col_tinyint from t group by col_int, col_tinyint
// agg method: keys64(AggregationMethodKeysFixed)
Expand All @@ -1081,7 +1049,6 @@ try
toNullableVec<Int8>("first_row(col_tinyint)", ColumnWithNullableInt8{0, 1, 2, 3}),
toVec<Int32>("col_int", ColumnWithInt32{0, 1, 2, 3}),
toVec<Int8>("col_tinyint", ColumnWithInt8{0, 1, 2, 3})};
WRAP_FOR_AGG_CHANGE_SETTINGS
executeAndAssertColumnsEqual(request, expected);
}

Expand All @@ -1098,7 +1065,6 @@ try
= {toVec<UInt64>("count(1)", ColumnWithUInt64{rows_per_type, rows_per_type, rows_per_type, rows_per_type}),
toNullableVec<Int32>("first_row(col_int)", ColumnWithNullableInt32{0, 1, 2, 3}),
toVec<Int32>("col_int", ColumnWithInt32{0, 1, 2, 3})};
WRAP_FOR_AGG_CHANGE_SETTINGS
executeAndAssertColumnsEqual(request, expected);
}

Expand Down Expand Up @@ -1133,7 +1099,6 @@ try
toNullableVec<String>("first_row(col_string_with_collator)", ColumnWithNullableString{"a", "b", "c", "d"}),
toVec<String>("col_string_with_collator", ColumnWithString{"a", "b", "c", "d"}),
};
WRAP_FOR_AGG_CHANGE_SETTINGS
executeAndAssertColumnsEqual(request, expected);
}

Expand All @@ -1151,7 +1116,6 @@ try
toVec<UInt64>("count(1)", ColumnWithUInt64{rows_per_type, rows_per_type, rows_per_type, rows_per_type}),
toVec<String>("first_row(col_string_with_collator)", ColumnWithString{"a", "b", "c", "d"}),
};
WRAP_FOR_AGG_CHANGE_SETTINGS
executeAndAssertColumnsEqual(request, expected);
}

Expand All @@ -1174,7 +1138,6 @@ try
toVec<Int32>("col_int", ColumnWithInt32{0, 1, 2, 3}),
toVec<String>("col_string_no_collator", ColumnWithString{"a", "b", "c", "d"}),
};
WRAP_FOR_AGG_CHANGE_SETTINGS
executeAndAssertColumnsEqual(request, expected);
}

Expand All @@ -1192,11 +1155,8 @@ try
toNullableVec<String>("first_row(col_string_with_collator)", ColumnWithNullableString{"a", "b", "c", "d"}),
toVec<String>("col_string_with_collator", ColumnWithString{"a", "b", "c", "d"}),
toVec<Int32>("col_int", ColumnWithInt32{0, 1, 2, 3})};
WRAP_FOR_AGG_CHANGE_SETTINGS
executeAndAssertColumnsEqual(request, expected);
}
FailPointHelper::disableFailPoint(FailPoints::force_agg_prefetch);
#undef WRAP_FOR_AGG_CHANGE_SETTINGS
}
CATCH

Expand Down Expand Up @@ -1227,9 +1187,13 @@ try

context
.addExchangeReceiver("exchange_receiver_1_concurrency", column_infos, column_data, 1, partition_column_infos);
context
.addExchangeReceiver("exchange_receiver_3_concurrency", column_infos, column_data, 3, partition_column_infos);
context
.addExchangeReceiver("exchange_receiver_5_concurrency", column_infos, column_data, 5, partition_column_infos);
context
.addExchangeReceiver("exchange_receiver_10_concurrency", column_infos, column_data, 10, partition_column_infos);
std::vector<size_t> exchange_receiver_concurrency = {1, 10};
std::vector<size_t> exchange_receiver_concurrency = {1, 3, 5, 10};

auto gen_request = [&](size_t exchange_concurrency) {
return context
Expand All @@ -1241,15 +1205,15 @@ try
auto baseline = executeStreams(gen_request(1), 1);
for (size_t exchange_concurrency : exchange_receiver_concurrency)
{
WRAP_FOR_AGG_FAILPOINTS_START
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(gen_request(exchange_concurrency), baseline);
WRAP_FOR_AGG_FAILPOINTS_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
CATCH

#undef WRAP_FOR_AGG_FAILPOINTS_START
#undef WRAP_FOR_AGG_FAILPOINTS_END
#undef WRAP_FOR_AGG_PARTIAL_BLOCK_START
#undef WRAP_FOR_AGG_PARTIAL_BLOCK_END

} // namespace tests
} // namespace DB

0 comments on commit e33075a

Please sign in to comment.