Skip to content

Commit

Permalink
Improve spiller constructors (facebookincubator#8896)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: facebookincubator#8896

Reviewed By: bikramSingh91, tanjialiang

Differential Revision: D54318936

Pulled By: xiaoxmeng
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Feb 28, 2024
1 parent 6a68ddd commit 60cb5b4
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 37 deletions.
3 changes: 1 addition & 2 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
table_->rows(),
tableType_,
std::move(hashBits),
&spillConfig,
spillConfig.maxFileSize);
&spillConfig);

const int32_t numPartitions = spiller_->hashBits().numPartitions();
spillInputIndicesBuffers_.resize(numPartitions);
Expand Down
4 changes: 0 additions & 4 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ class HashBuild final : public Operator {
return spillConfig_.has_value();
}

const common::SpillConfig* spillConfig() const {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

void recordSpillStats();
void recordSpillStats(Spiller* spiller);

Expand Down
3 changes: 1 addition & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ void HashProbe::maybeSetupSpillInput(
spillInputPartitionIds_.begin()->partitionBitOffset(),
spillInputPartitionIds_.begin()->partitionBitOffset() +
spillConfig.joinPartitionBits),
&spillConfig,
spillConfig.maxFileSize);
&spillConfig);
// Set the spill partitions to the corresponding ones at the build side. The
// hash probe operator itself won't trigger any spilling.
spiller_->setPartitionsSpilled(toPartitionNumSet(spillInputPartitionIds_));
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,10 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value();
}

const common::SpillConfig* spillConfig() const {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Creates output vector from 'input_' and 'results' according to
/// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to
/// nullptr, the children of the output vector will be identical to their
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/ProbeOperatorState.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum class ProbeOperatorState {
kWaitForBuild = 0,
/// The running state that join the probe input with the build table.
kRunning = 1,
/// This state has different handlings for hash and nested loop join probe.
/// This state has different handling for hash and nested loop join probe.
/// For hash probe, wait for all the peer probe operators to finish processing
/// inputs.
/// This state only applies when disk spilling is enabled. The last finished
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ void RowContainer::extractProbedFlags(
if (nullResult) {
flatResult->setNull(i, true);
} else {
bool probed = bits::isBitSet(rows[i], probedFlagOffset_);
const bool probed = bits::isBitSet(rows[i], probedFlagOffset_);
if (setNullForNonProbedRow && !probed) {
flatResult->setNull(i, true);
} else {
Expand Down
9 changes: 2 additions & 7 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,7 @@ void RowNumber::setupHashTableSpiller() {
table_->rows(),
tableType,
std::move(hashBits),
&spillConfig,
spillConfig.maxFileSize);
&spillConfig);
}

void RowNumber::setupInputSpiller() {
Expand All @@ -412,11 +411,7 @@ void RowNumber::setupInputSpiller() {

// TODO Replace Spiller::Type::kHashJoinProbe.
inputSpiller_ = std::make_unique<Spiller>(
Spiller::Type::kHashJoinProbe,
inputType_,
hashBits,
&spillConfig,
spillConfig.maxFileSize);
Spiller::Type::kHashJoinProbe, inputType_, hashBits, &spillConfig);

const auto& hashers = table_->hashers();

Expand Down
10 changes: 4 additions & 6 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ Spiller::Spiller(
Type type,
RowTypePtr rowType,
HashBitRange bits,
const common::SpillConfig* spillConfig,
uint64_t targetFileSize)
const common::SpillConfig* spillConfig)
: Spiller(
type,
nullptr,
Expand All @@ -108,7 +107,7 @@ Spiller::Spiller(
spillConfig->getSpillDirPathCb,
spillConfig->updateAndCheckSpillLimitCb,
spillConfig->fileNamePrefix,
targetFileSize,
spillConfig->maxFileSize,
spillConfig->writeBufferSize,
spillConfig->compressionKind,
spillConfig->executor,
Expand All @@ -126,8 +125,7 @@ Spiller::Spiller(
RowContainer* container,
RowTypePtr rowType,
HashBitRange bits,
const common::SpillConfig* spillConfig,
uint64_t targetFileSize)
const common::SpillConfig* spillConfig)
: Spiller(
type,
container,
Expand All @@ -138,7 +136,7 @@ Spiller::Spiller(
spillConfig->getSpillDirPathCb,
spillConfig->updateAndCheckSpillLimitCb,
spillConfig->fileNamePrefix,
targetFileSize,
spillConfig->maxFileSize,
spillConfig->writeBufferSize,
spillConfig->compressionKind,
spillConfig->executor,
Expand Down
6 changes: 2 additions & 4 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,15 @@ class Spiller {
Type type,
RowTypePtr rowType,
HashBitRange bits,
const common::SpillConfig* spillConfig,
uint64_t targetFileSize);
const common::SpillConfig* spillConfig);

/// type == Type::kHashJoinBuild
Spiller(
Type type,
RowContainer* container,
RowTypePtr rowType,
HashBitRange bits,
const common::SpillConfig* spillConfig,
uint64_t targetFileSize);
const common::SpillConfig* spillConfig);

Type type() const {
return type_;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/JoinSpillInputBenchmarkBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ void JoinSpillInputBenchmarkBase::setUp() {
spillConfig.executor = executor_.get();
spillConfig.compressionKind =
stringToCompressionKind(FLAGS_spiller_benchmark_compression_kind);
spillConfig.maxFileSize = FLAGS_spiller_benchmark_max_spill_file_size;
spillConfig.maxSpillRunRows = 0;
spillConfig.fileCreateConfig = {};

spiller_ = std::make_unique<Spiller>(
exec::Spiller::Type::kHashJoinProbe,
rowType_,
HashBitRange{29, 29},
&spillConfig,
FLAGS_spiller_benchmark_max_spill_file_size);
&spillConfig);
spiller_->setPartitionsSpilled({0});
}

Expand Down
12 changes: 4 additions & 8 deletions velox/exec/tests/SpillerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,13 @@ class SpillerTest : public exec::test::RowContainerTestBase {
spillConfig.executor = executor();
spillConfig.compressionKind = compressionKind_;
spillConfig.maxSpillRunRows = maxSpillRunRows;
spillConfig.maxFileSize = targetFileSize;
spillConfig.fileCreateConfig = {};

if (type_ == Spiller::Type::kHashJoinProbe) {
// kHashJoinProbe doesn't have associated row container.
spiller_ = std::make_unique<Spiller>(
type_, rowType_, hashBits_, &spillConfig, targetFileSize);
spiller_ =
std::make_unique<Spiller>(type_, rowType_, hashBits_, &spillConfig);
} else if (
type_ == Spiller::Type::kOrderByInput ||
type_ == Spiller::Type::kAggregateInput) {
Expand All @@ -512,12 +513,7 @@ class SpillerTest : public exec::test::RowContainerTestBase {
} else {
VELOX_CHECK_EQ(type_, Spiller::Type::kHashJoinBuild);
spiller_ = std::make_unique<Spiller>(
type_,
rowContainer_.get(),
rowType_,
hashBits_,
&spillConfig,
targetFileSize);
type_, rowContainer_.get(), rowType_, hashBits_, &spillConfig);
}
ASSERT_EQ(spiller_->state().maxPartitions(), numPartitions_);
ASSERT_FALSE(spiller_->isAllSpilled());
Expand Down

0 comments on commit 60cb5b4

Please sign in to comment.