Skip to content

Commit

Permalink
Add spill pct utility (facebookincubator#8921)
Browse files Browse the repository at this point in the history
Summary:
Adds global utility TestSpillUtils and add apply that to all except for aggregation. This is because presto-native depends on the original testingSpillPct for testing on aggregation path. We will take care of aggregation path in followup PRs.

Pull Request resolved: facebookincubator#8921

Reviewed By: xiaoxmeng

Differential Revision: D54379114

Pulled By: tanjialiang

fbshipit-source-id: 7d61d9ff3b783d81da111f08b829fed8a130ca18
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Mar 2, 2024
1 parent a8aad60 commit a6672eb
Show file tree
Hide file tree
Showing 18 changed files with 115 additions and 72 deletions.
9 changes: 0 additions & 9 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1107,15 +1107,6 @@ std::string HashBuild::stateName(State state) {
}
}

bool HashBuild::testingTriggerSpill() {
// Test-only spill path.
if (spillConfig()->testSpillPct == 0) {
return false;
}
return folly::hasher<uint64_t>()(++spillTestCounter_) % 100 <=
spillConfig()->testSpillPct;
}

void HashBuild::reclaim(
uint64_t /*unused*/,
memory::MemoryReclaimer::Stats& stats) {
Expand Down
7 changes: 0 additions & 7 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,6 @@ class HashBuild final : public Operator {

void addRuntimeStats();

// Invoked to check if it needs to trigger spilling for test purpose only.
bool testingTriggerSpill();

// Indicates if this hash build operator is under non-reclaimable state or
// not.
bool nonReclaimableState() const;
Expand Down Expand Up @@ -307,10 +304,6 @@ class HashBuild final : public Operator {
// at least one entry with null join keys.
bool joinHasNullKeys_{false};

// Counts input batches and triggers spilling if folly hash of this % 100 <=
// 'testSpillPct_';.
uint64_t spillTestCounter_{0};

// The spill targets set by 'requestSpill()' to request group spill.
uint64_t numSpillRows_{0};
uint64_t numSpillBytes_{0};
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void RowNumber::ensureInputFits(const RowVectorPtr& input) {
const auto outOfLineBytesPerRow = outOfLineBytes / numDistinct;

// Test-only spill path.
if (spillConfig_->testSpillPct > 0) {
if (testingTriggerSpill()) {
spill();
return;
}
Expand Down
4 changes: 1 addition & 3 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) {
const int64_t flatInputBytes = input->estimateFlatSize();

// Test-only spill path.
if (numRows > 0 && spillConfig_->testSpillPct &&
(folly::hasher<uint64_t>()(++spillTestCounter_)) % 100 <=
spillConfig_->testSpillPct) {
if (numRows > 0 && testingTriggerSpill()) {
spill();
return;
}
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ class SortBuffer {
// Records the source rows to copy to 'output_' in order.
std::vector<const RowVector*> spillSources_;
std::vector<vector_size_t> spillSourceRows_;
// Counts input batches to trigger spilling for test.
uint64_t spillTestCounter_{0};

// Reusable output vector.
RowVectorPtr output_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void SortWindowBuild::ensureInputFits(const RowVectorPtr& input) {
}

// Test-only spill path.
if (spillConfig_->testSpillPct > 0) {
if (testingTriggerSpill()) {
spill();
return;
}
Expand Down
34 changes: 34 additions & 0 deletions velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,38 @@ SpillPartitionIdSet toSpillPartitionIdSet(
}
return partitionIdSet;
}

tsan_atomic<int32_t>& testingSpillPct() {
static tsan_atomic<int32_t> spillPct = 0;
return spillPct;
}

tsan_atomic<int32_t>& testingSpillCounter() {
static tsan_atomic<int32_t> spillCounter = 0;
return spillCounter;
}

TestScopedSpillInjection::TestScopedSpillInjection(
int32_t spillPct,
int32_t maxInjections) {
VELOX_CHECK_EQ(testingSpillCounter(), 0);
testingSpillPct() = spillPct;
testingSpillCounter() = maxInjections;
}

TestScopedSpillInjection::~TestScopedSpillInjection() {
testingSpillPct() = 0;
testingSpillCounter() = 0;
}

bool testingTriggerSpill() {
// Do not evaluate further if trigger is not set.
if (testingSpillCounter() <= 0 || testingSpillPct() <= 0) {
return false;
}
if (folly::Random::rand32() % 100 < testingSpillPct()) {
return testingSpillCounter()-- > 0;
}
return false;
}
} // namespace facebook::velox::exec
21 changes: 21 additions & 0 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,27 @@ class SpillState {
/// Generate partition id set from given spill partition set.
SpillPartitionIdSet toSpillPartitionIdSet(
const SpillPartitionSet& partitionSet);

/// Scoped spill percentage utility that allows user to set the behavior of
/// triggered spill.
/// 'spillPct' indicates the chance of triggering spilling. 100% means spill
/// will always be triggered.
/// 'maxInjections' indicates the max number of actual triggering. e.g. when
/// 'spillPct' is 20 and 'maxInjections' is 10, continuous calls to
/// testingTriggerSpill() will keep rolling the dice that has a chance of 20%
/// triggering until 10 triggers have been invoked.
class TestScopedSpillInjection {
public:
explicit TestScopedSpillInjection(
int32_t spillPct,
int32_t maxInjections = std::numeric_limits<int32_t>::max());

~TestScopedSpillInjection();
};

/// Test utility that returns true if triggered spill is evaluated to happen,
/// false otherwise.
bool testingTriggerSpill();
} // namespace facebook::velox::exec

// Adding the custom hash for SpillPartitionId to std::hash to make it usable
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ void TopNRowNumber::ensureInputFits(const RowVectorPtr& input) {
}

// Test-only spill path.
if (spillConfig_->testSpillPct > 0) {
if (testingTriggerSpill()) {
spill();
return;
}
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ class HashJoinBuilder {
}
auto queryCtx = std::make_shared<core::QueryCtx>(executor_);
std::shared_ptr<TempDirectoryPath> spillDirectory;
int32_t spillPct{0};
if (injectSpill) {
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path);
Expand All @@ -595,7 +596,7 @@ class HashJoinBuilder {
// Disable write buffering to ease test verification. For example, we want
// many spilled vectors in a spilled file to trigger recursive spilling.
config(core::QueryConfig::kSpillWriteBufferSize, std::to_string(0));
config(core::QueryConfig::kTestingSpillPct, "100");
spillPct = 100;
} else if (spillMemoryThreshold_ != 0) {
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path);
Expand Down Expand Up @@ -636,6 +637,7 @@ class HashJoinBuilder {
ASSERT_EQ(memory::spillMemoryPool()->stats().currentBytes, 0);
const uint64_t peakSpillMemoryUsage =
memory::spillMemoryPool()->stats().peakBytes;
TestScopedSpillInjection scopedSpillInjection(spillPct);
auto task = builder.assertResults(referenceQuery_);
const auto statsPair = taskSpilledStats(*task);
if (injectSpill) {
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/JoinFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,16 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) {
}

std::shared_ptr<TempDirectoryPath> spillDirectory;
int32_t spillPct{0};
if (injectSpill) {
spillDirectory = exec::test::TempDirectoryPath::create();
builder.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(core::QueryConfig::kTestingSpillPct, "100")
.spillDirectory(spillDirectory->path);
spillPct = 100;
}

TestScopedSpillInjection scopedSpillInjection(spillPct);
auto result = builder.maxDrivers(2).copyResults(pool_.get());
LOG(INFO) << "Results: " << result->toString();
if (VLOG_IS_ON(1)) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ class OrderByTest : public OperatorTestBase {
SCOPED_TRACE("run with spilling");
auto spillDirectory = exec::test::TempDirectoryPath::create();
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
TestScopedSpillInjection scopedSpillInjection(100);
queryCtx->testingOverrideConfigUnsafe({
{core::QueryConfig::kTestingSpillPct, "100"},
{core::QueryConfig::kSpillEnabled, "true"},
{core::QueryConfig::kOrderBySpillEnabled, "true"},
});
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/RowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ TEST_F(RowNumberTest, spill) {
makeFlatVector<int64_t>({vectorSize, vectorSize, vectorSize}),
});

TestScopedSpillInjection scopedSpillInjection(100);
auto task = AssertQueryBuilder(plan)
.config(core::QueryConfig::kTestingSpillPct, 100)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kRowNumberSpillEnabled, true)
.spillDirectory(spillDirectory->path)
Expand Down Expand Up @@ -242,12 +242,12 @@ TEST_F(RowNumberTest, maxSpillBytes) {
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
try {
TestScopedSpillInjection scopedSpillInjection(100);
AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.queryCtx(queryCtx)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kRowNumberSpillEnabled, true)
.config(core::QueryConfig::kTestingSpillPct, 100)
.config(core::QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes)
.copyResults(pool_.get());
ASSERT_FALSE(testData.expectedExceedLimit);
Expand Down Expand Up @@ -284,12 +284,12 @@ TEST_F(RowNumberTest, memoryUsage) {
const std::string spillEnableConfig = std::to_string(spillEnable);

std::shared_ptr<Task> task;
TestScopedSpillInjection scopedSpillInjection(100);
AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.queryCtx(queryCtx)
.config(core::QueryConfig::kSpillEnabled, spillEnableConfig)
.config(core::QueryConfig::kRowNumberSpillEnabled, spillEnableConfig)
.config(core::QueryConfig::kTestingSpillPct, "100")
.spillDirectory(spillDirectory->path)
.copyResults(pool_.get(), task);

Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/SortBufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ TEST_F(SortBufferTest, batchOutput) {
{false, {1024, 1024, 1024}, 1000, {1000, 1000, 1000, 72}},
{true, {1024, 1024, 1024}, 1000, {1000, 1000, 1000, 72}}};

TestScopedSpillInjection scopedSpillInjection(100);
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
auto spillDirectory = exec::test::TempDirectoryPath::create();
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ class TableWriteTest : public HiveConnectorTestBase {
.assertResults(duckDbSql);
}
const auto spillDirectory = exec::test::TempDirectoryPath::create();
TestScopedSpillInjection scopedSpillInjection(100);
return AssertQueryBuilder(plan, duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.maxDrivers(
Expand All @@ -286,7 +287,6 @@ class TableWriteTest : public HiveConnectorTestBase {
std::to_string(numPartitionedTableWriterCount_))
.config(core::QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kWriterSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.splits(splits)
.assertResults(duckDbSql);
}
Expand All @@ -296,6 +296,7 @@ class TableWriteTest : public HiveConnectorTestBase {
const std::string& duckDbSql,
bool enableSpill = false) {
if (!enableSpill) {
TestScopedSpillInjection scopedSpillInjection(100);
return AssertQueryBuilder(plan, duckDbQueryRunner_)
.maxDrivers(
2 *
Expand All @@ -308,11 +309,11 @@ class TableWriteTest : public HiveConnectorTestBase {
std::to_string(numPartitionedTableWriterCount_))
.config(core::QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kWriterSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.assertResults(duckDbSql);
}

const auto spillDirectory = exec::test::TempDirectoryPath::create();
TestScopedSpillInjection scopedSpillInjection(100);
return AssertQueryBuilder(plan, duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.maxDrivers(
Expand All @@ -324,7 +325,6 @@ class TableWriteTest : public HiveConnectorTestBase {
std::to_string(numPartitionedTableWriterCount_))
.config(core::QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kWriterSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.assertResults(duckDbSql);
}

Expand Down
5 changes: 2 additions & 3 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,6 @@ TEST_F(TaskTest, spillDirectoryLifecycleManagement) {
makeFlatVector<int64_t>(1'000, [](auto row) { return row % 300; }),
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});

core::PlanNodeId aggrNodeId;
const auto plan = PlanBuilder()
.values({data})
Expand Down Expand Up @@ -1441,9 +1440,9 @@ TEST_F(TaskTest, spillDirNotCreated) {
params.queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
params.queryCtx->testingOverrideConfigUnsafe(
{{core::QueryConfig::kSpillEnabled, "true"},
{core::QueryConfig::kJoinSpillEnabled, "true"},
{core::QueryConfig::kTestingSpillPct, "0"}});
{core::QueryConfig::kJoinSpillEnabled, "true"}});
params.maxDrivers = 1;
TestScopedSpillInjection scopedSpillInjection(100);

auto cursor = TaskCursor::create(params);
auto* task = cursor->task().get();
Expand Down
Loading

0 comments on commit a6672eb

Please sign in to comment.