Skip to content

Commit

Permalink
Add more debug info for flaky aggregation spill stats check
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Apr 10, 2024
1 parent 89de5d3 commit fb44178
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ void AggregationTestBase::testAggregationsWithCompanion(
// Spilling needs at least 2 batches of input. Use round-robin
// repartitioning to split input into multiple batches.
core::PlanNodeId partialNodeId;
core::PlanNodeId finalNodeId;
builder.localPartitionRoundRobinRow()
.partialAggregation(groupingKeysWithPartialKey, paritialAggregates)
.capturePlanNodeId(partialNodeId)
Expand All @@ -390,6 +391,7 @@ void AggregationTestBase::testAggregationsWithCompanion(
.capturePlanNodeId(partialNodeId)
.localPartition(groupingKeys)
.finalAggregation()
.capturePlanNodeId(finalNodeId)
.project(extractExpressions);

if (!postAggregationProjections.empty()) {
Expand All @@ -409,18 +411,22 @@ void AggregationTestBase::testAggregationsWithCompanion(
auto task = assertResults(queryBuilder);

// Expect > 0 spilled bytes unless there was no input.
const auto inputRows =
const auto partialInputRows =
toPlanStats(task->taskStats()).at(partialNodeId).inputRows;
if (inputRows > 1) {
const auto finalInputRows =
toPlanStats(task->taskStats()).at(finalNodeId).inputRows;
if (partialInputRows > 1) {
EXPECT_LT(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
<< " injectedSpills: " << exec::injectedSpillCount();
} else {
EXPECT_EQ(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
Expand Down Expand Up @@ -816,11 +822,13 @@ void AggregationTestBase::testAggregationsImpl(
// Spilling needs at least 2 batches of input. Use round-robin
// repartitioning to split input into multiple batches.
core::PlanNodeId partialNodeId;
core::PlanNodeId finalNodeId;
builder.localPartitionRoundRobinRow()
.partialAggregation(groupingKeys, aggregates)
.capturePlanNodeId(partialNodeId)
.localPartition(groupingKeys)
.finalAggregation();
.finalAggregation()
.capturePlanNodeId(finalNodeId);

if (!postAggregationProjections.empty()) {
builder.project(postAggregationProjections);
Expand All @@ -842,10 +850,14 @@ void AggregationTestBase::testAggregationsImpl(
auto task = assertResults(queryBuilder);

// Expect > 0 spilled bytes unless there was no input.
auto inputRows = toPlanStats(task->taskStats()).at(partialNodeId).inputRows;
if (inputRows > 1) {
const auto partialInputRows =
toPlanStats(task->taskStats()).at(partialNodeId).inputRows;
const auto finalInputRows =
toPlanStats(task->taskStats()).at(finalNodeId).inputRows;
if (partialInputRows > 1) {
EXPECT_LT(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
Expand All @@ -856,7 +868,8 @@ void AggregationTestBase::testAggregationsImpl(
memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage);
} else {
EXPECT_EQ(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
Expand Down

0 comments on commit fb44178

Please sign in to comment.