diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 86bb552ac418..a1805e69cd2e 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -241,8 +241,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { table_->rows(), tableType_, std::move(hashBits), - &spillConfig, - spillConfig.maxFileSize); + &spillConfig); const int32_t numPartitions = spiller_->hashBits().numPartitions(); spillInputIndicesBuffers_.resize(numPartitions); diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 6359569fccfd..84366f5cff2b 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -118,10 +118,6 @@ class HashBuild final : public Operator { return spillConfig_.has_value(); } - const common::SpillConfig* spillConfig() const { - return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; - } - void recordSpillStats(); void recordSpillStats(Spiller* spiller); diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 03f90d66bebc..d02e2af1c770 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -253,8 +253,7 @@ void HashProbe::maybeSetupSpillInput( spillInputPartitionIds_.begin()->partitionBitOffset(), spillInputPartitionIds_.begin()->partitionBitOffset() + spillConfig.joinPartitionBits), - &spillConfig, - spillConfig.maxFileSize); + &spillConfig); // Set the spill partitions to the corresponding ones at the build side. The // hash probe operator itself won't trigger any spilling. spiller_->setPartitionsSpilled(toPartitionNumSet(spillInputPartitionIds_)); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index b00dda0d958b..8479c6ba9c6b 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -661,6 +661,10 @@ class Operator : public BaseRuntimeStatWriter { return spillConfig_.has_value(); } + const common::SpillConfig* spillConfig() const { + return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; + } + /// Creates output vector from 'input_' and 'results' according to /// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to /// nullptr, the children of the output vector will be identical to their diff --git a/velox/exec/ProbeOperatorState.h b/velox/exec/ProbeOperatorState.h index eb0d2474c935..a7b53b48857b 100644 --- a/velox/exec/ProbeOperatorState.h +++ b/velox/exec/ProbeOperatorState.h @@ -36,7 +36,7 @@ enum class ProbeOperatorState { kWaitForBuild = 0, /// The running state that join the probe input with the build table. kRunning = 1, - /// This state has different handlings for hash and nested loop join probe. + /// This state has different handling for hash and nested loop join probe. /// For hash probe, wait for all the peer probe operators to finish processing /// inputs. /// This state only applies when disk spilling is enabled. The last finished diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index fa56374e82b4..0f7e6cb9dd2e 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -861,7 +861,7 @@ void RowContainer::extractProbedFlags( if (nullResult) { flatResult->setNull(i, true); } else { - bool probed = bits::isBitSet(rows[i], probedFlagOffset_); + const bool probed = bits::isBitSet(rows[i], probedFlagOffset_); if (setNullForNonProbedRow && !probed) { flatResult->setNull(i, true); } else { diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 04f289c818ee..2dcf06a84fe4 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -402,8 +402,7 @@ void RowNumber::setupHashTableSpiller() { table_->rows(), tableType, std::move(hashBits), - &spillConfig, - spillConfig.maxFileSize); + &spillConfig); } void RowNumber::setupInputSpiller() { @@ -412,11 +411,7 @@ void RowNumber::setupInputSpiller() { // TODO Replace Spiller::Type::kHashJoinProbe. inputSpiller_ = std::make_unique( - Spiller::Type::kHashJoinProbe, - inputType_, - hashBits, - &spillConfig, - spillConfig.maxFileSize); + Spiller::Type::kHashJoinProbe, inputType_, hashBits, &spillConfig); const auto& hashers = table_->hashers(); diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 7802d68ec8da..87b7e3bce07e 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -96,8 +96,7 @@ Spiller::Spiller( Type type, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig, - uint64_t targetFileSize) + const common::SpillConfig* spillConfig) : Spiller( type, nullptr, @@ -108,7 +107,7 @@ Spiller::Spiller( spillConfig->getSpillDirPathCb, spillConfig->updateAndCheckSpillLimitCb, spillConfig->fileNamePrefix, - targetFileSize, + spillConfig->maxFileSize, spillConfig->writeBufferSize, spillConfig->compressionKind, spillConfig->executor, @@ -126,8 +125,7 @@ Spiller::Spiller( RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig, - uint64_t targetFileSize) + const common::SpillConfig* spillConfig) : Spiller( type, container, @@ -138,7 +136,7 @@ Spiller::Spiller( spillConfig->getSpillDirPathCb, spillConfig->updateAndCheckSpillLimitCb, spillConfig->fileNamePrefix, - targetFileSize, + spillConfig->maxFileSize, spillConfig->writeBufferSize, spillConfig->compressionKind, spillConfig->executor, diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 899bc00b69c0..3c4c855d0a78 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -71,8 +71,7 @@ class Spiller { Type type, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig, - uint64_t targetFileSize); + const common::SpillConfig* spillConfig); /// type == Type::kHashJoinBuild Spiller( @@ -80,8 +79,7 @@ class Spiller { RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig, - uint64_t targetFileSize); + const common::SpillConfig* spillConfig); Type type() const { return type_; diff --git a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp index ec18880a3334..66627e145771 100644 --- a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp +++ b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp @@ -41,6 +41,7 @@ void JoinSpillInputBenchmarkBase::setUp() { spillConfig.executor = executor_.get(); spillConfig.compressionKind = stringToCompressionKind(FLAGS_spiller_benchmark_compression_kind); + spillConfig.maxFileSize = FLAGS_spiller_benchmark_max_spill_file_size; spillConfig.maxSpillRunRows = 0; spillConfig.fileCreateConfig = {}; @@ -48,8 +49,7 @@ void JoinSpillInputBenchmarkBase::setUp() { exec::Spiller::Type::kHashJoinProbe, rowType_, HashBitRange{29, 29}, - &spillConfig, - FLAGS_spiller_benchmark_max_spill_file_size); + &spillConfig); spiller_->setPartitionsSpilled({0}); } diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index c1bad6e6bf06..c6abde1a77a8 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -486,12 +486,13 @@ class SpillerTest : public exec::test::RowContainerTestBase { spillConfig.executor = executor(); spillConfig.compressionKind = compressionKind_; spillConfig.maxSpillRunRows = maxSpillRunRows; + spillConfig.maxFileSize = targetFileSize; spillConfig.fileCreateConfig = {}; if (type_ == Spiller::Type::kHashJoinProbe) { // kHashJoinProbe doesn't have associated row container. - spiller_ = std::make_unique( - type_, rowType_, hashBits_, &spillConfig, targetFileSize); + spiller_ = + std::make_unique(type_, rowType_, hashBits_, &spillConfig); } else if ( type_ == Spiller::Type::kOrderByInput || type_ == Spiller::Type::kAggregateInput) { @@ -512,12 +513,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { } else { VELOX_CHECK_EQ(type_, Spiller::Type::kHashJoinBuild); spiller_ = std::make_unique( - type_, - rowContainer_.get(), - rowType_, - hashBits_, - &spillConfig, - targetFileSize); + type_, rowContainer_.get(), rowType_, hashBits_, &spillConfig); } ASSERT_EQ(spiller_->state().maxPartitions(), numPartitions_); ASSERT_FALSE(spiller_->isAllSpilled());