diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index a1805e69cd2e..7ec2a62c505a 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -845,8 +845,9 @@ bool HashBuild::finishHashBuild() { isInputFromSpill() ? spillConfig()->startPartitionBit : BaseHashTable::kNoSpillInputStartPartitionBit); addRuntimeStats(); - if (joinBridge_->setHashTable( - std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) { + joinBridge_->setHashTable( + std::move(table_), std::move(spillPartitions), joinHasNullKeys_); + if (spillEnabled()) { intermediateStateCleared_ = true; spillGroup_->restart(); } @@ -1222,11 +1223,11 @@ void HashBuild::reclaim( } bool HashBuild::nonReclaimableState() const { - // Apart from being in the nonReclaimable section, - // its also not reclaimable if: - // 1) the hash table has been built by the last build thread (inidicated - // by state_) - // 2) the last build operator has transferred ownership of 'this' operator's + // Apart from being in the nonReclaimable section, it's also not reclaimable + // if: + // 1) the hash table has been built by the last build thread (indicated by + // state_) + // 2) the last build operator has transferred ownership of 'this operator's // intermediate state (table_ and spiller_) to itself // 3) it has completed spilling before reaching either of the previous // two states. diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 84366f5cff2b..ae736e8ffffb 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -264,8 +264,8 @@ class HashBuild final : public Operator { // The row type used for hash table build and disk spilling. RowTypePtr tableType_; - // Used to serialize access to intermediate state variables (like 'table_' and - // 'spiller_'). This is only required when variables are accessed + // Used to serialize access to internal state including 'table_' and + // 'spiller_'. This is only required when variables are accessed // concurrently, that is, when a thread tries to close the operator while // another thread is building the hash table. Refer to 'close()' and // finishHashBuild()' for more details. @@ -316,8 +316,8 @@ class HashBuild final : public Operator { uint64_t numSpillBytes_{0}; // This can be nullptr if either spilling is not allowed or it has been - // trsnaferred to the last hash build operator while in kWaitForBuild state or - // it has been cleared to setup a new one for recursive spilling. + // transferred to the last hash build operator while in kWaitForBuild state or + // it has been cleared to set up a new one for recursive spilling. std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index 464f7dbff100..99af07cb9cb7 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -29,7 +29,7 @@ void HashJoinBridge::addBuilder() { ++numBuilders_; } -bool HashJoinBridge::setHashTable( +void HashJoinBridge::setHashTable( std::unique_ptr table, SpillPartitionSet spillPartitionSet, bool hasNullKeys) { @@ -37,7 +37,6 @@ bool HashJoinBridge::setHashTable( auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet); - bool hasSpillData; std::vector promises; { std::lock_guard l(mutex_); @@ -64,12 +63,30 @@ bool HashJoinBridge::setHashTable( std::move(spillPartitionIdSet), hasNullKeys); restoringSpillPartitionId_.reset(); - - hasSpillData = !spillPartitionSets_.empty(); promises = std::move(promises_); } notify(std::move(promises)); - return hasSpillData; +} + +void HashJoinBridge::setSpilledHashTable(SpillPartitionSet spillPartitionSet) { + VELOX_CHECK( + !spillPartitionSet.empty(), "Spilled table partitions can't be empty"); + std::shared_ptr tableToFree; + { + std::lock_guard l(mutex_); + VELOX_CHECK(started_); + VELOX_CHECK(buildResult_.has_value()); + VELOX_CHECK(restoringSpillShards_.empty()); + VELOX_CHECK(!restoringSpillPartitionId_.has_value()); + + for (auto& partitionEntry : spillPartitionSet) { + const auto id = partitionEntry.first; + VELOX_CHECK_EQ(spillPartitionSets_.count(id), 0); + spillPartitionSets_.emplace(id, std::move(partitionEntry.second)); + } + // Free up the memory resource of the spilled hash table. + tableToFree = std::move(buildResult_->table); + } } void HashJoinBridge::setAntiJoinHasNullKeys() { @@ -131,10 +148,8 @@ bool HashJoinBridge::probeFinished() { spillPartitionSets_.begin()->second->split(numBuilders_); VELOX_CHECK_EQ(restoringSpillShards_.size(), numBuilders_); spillPartitionSets_.erase(spillPartitionSets_.begin()); - promises = std::move(promises_); - } else { - VELOX_CHECK(promises_.empty()); } + promises = std::move(promises_); } notify(std::move(promises)); return hasSpillInput; @@ -148,14 +163,16 @@ std::optional HashJoinBridge::spillInputOrFuture( VELOX_DCHECK( !restoringSpillPartitionId_.has_value() || !buildResult_.has_value()); + if (buildResult_.has_value()) { + VELOX_CHECK(!restoringSpillPartitionId_.has_value()); + promises_.emplace_back("HashJoinBridge::spillInputOrFuture"); + *future = promises_.back().getSemiFuture(); + return std::nullopt; + } if (!restoringSpillPartitionId_.has_value()) { - if (spillPartitionSets_.empty()) { - return HashJoinBridge::SpillInput{}; - } else { - promises_.emplace_back("HashJoinBridge::spillInputOrFuture"); - *future = promises_.back().getSemiFuture(); - return std::nullopt; - } + VELOX_CHECK(spillPartitionSets_.empty()); + VELOX_CHECK(restoringSpillShards_.empty()); + return HashJoinBridge::SpillInput{}; } VELOX_CHECK(!restoringSpillShards_.empty()); auto spillShard = std::move(restoringSpillShards_.back()); @@ -175,17 +192,27 @@ uint64_t HashJoinMemoryReclaimer::reclaim( uint64_t targetBytes, uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) { + bool hasReclaimedFromBuild{false}; + bool hasReclaimedFromProbe{false}; uint64_t reclaimedBytes{0}; pool->visitChildren([&](memory::MemoryPool* child) { VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf); - // The hash probe operator do not support memory reclaim. - if (!isHashBuildMemoryPool(*child)) { - return true; + const bool isBuild = isHashBuildMemoryPool(*child); + if (isBuild) { + if (!hasReclaimedFromBuild) { + hasReclaimedFromBuild = true; + reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats); + } + // We only need to reclaim from any one of the hash build operators + // which will reclaim from all the peer hash build operators. + return !hasReclaimedFromProbe; + } + + if (!hasReclaimedFromProbe) { + hasReclaimedFromProbe = true; + reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats); } - // We only need to reclaim from any one of the hash build operators - // which will reclaim from all the peer hash build operators. - reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats); - return false; + return !hasReclaimedFromBuild; }); return reclaimedBytes; } @@ -193,4 +220,8 @@ uint64_t HashJoinMemoryReclaimer::reclaim( bool isHashBuildMemoryPool(const memory::MemoryPool& pool) { return folly::StringPiece(pool.name()).endsWith("HashBuild"); } + +bool isHashProbeMemoryPool(const memory::MemoryPool& pool) { + return folly::StringPiece(pool.name()).endsWith("HashProbe"); +} } // namespace facebook::velox::exec diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 899f8fa5c63f..1ea517f25367 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -35,15 +35,21 @@ class HashJoinBridge : public JoinBridge { /// HashBuild operators to parallelize the restoring operation. void addBuilder(); + /// Invoked by the build operator to set the built hash table. /// 'spillPartitionSet' contains the spilled partitions while building - /// 'table'. The function returns true if there is spill data to restore - /// after HashProbe operators process 'table', otherwise false. This only - /// applies if the disk spilling is enabled. - bool setHashTable( + /// 'table' which only applies if the disk spilling is enabled. + void setHashTable( std::unique_ptr table, SpillPartitionSet spillPartitionSet, bool hasNullKeys); + /// Invoked by the probe operator to set the spilled hash table while the + /// probing. The function puts the spilled table partitions into + /// 'spillPartitionSets_' stack as well as clearing the table set in + /// 'buildResult_' to free up the memory resource held by the spilled table. + /// This only applies if the disk spilling is enabled. + void setSpilledHashTable(SpillPartitionSet spillPartitionSet); + void setAntiJoinHasNullKeys(); /// Represents the result of HashBuild operators: a hash table, an optional @@ -75,8 +81,7 @@ class HashJoinBridge : public JoinBridge { /// HashBuild operators. If HashProbe operator calls this early, 'future' will /// be set to wait asynchronously, otherwise the built table along with /// optional spilling related information will be returned in HashBuildResult. - std::optional tableOrFuture( - ContinueFuture* FOLLY_NONNULL future); + std::optional tableOrFuture(ContinueFuture* future); /// Invoked by HashProbe operator after finishes probing the built table to /// set one of the previously spilled partition to restore. The HashBuild @@ -102,8 +107,7 @@ class HashJoinBridge : public JoinBridge { /// If HashBuild operator calls this early, 'future' will be set to wait /// asynchronously. If there is no more spill data to restore, then /// 'spillPartition' will be set to null in the returned SpillInput. - std::optional spillInputOrFuture( - ContinueFuture* FOLLY_NONNULL future); + std::optional spillInputOrFuture(ContinueFuture* future); private: uint32_t numBuilders_{0}; @@ -156,4 +160,8 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { /// Returns true if 'pool' is a hash build operator's memory pool. The check is /// currently based on the pool name. bool isHashBuildMemoryPool(const memory::MemoryPool& pool); + +/// Returns true if 'pool' is a hash probe operator's memory pool. The check is +/// currently based on the pool name. +bool isHashProbeMemoryPool(const memory::MemoryPool& pool); } // namespace facebook::velox::exec diff --git a/velox/exec/tests/HashJoinBridgeTest.cpp b/velox/exec/tests/HashJoinBridgeTest.cpp index 9ad04c331a5f..46c48c4b9608 100644 --- a/velox/exec/tests/HashJoinBridgeTest.cpp +++ b/velox/exec/tests/HashJoinBridgeTest.cpp @@ -150,15 +150,17 @@ TEST_P(HashJoinBridgeTest, withoutSpill) { auto joinBridge = createJoinBridge(); // Can't call any other APIs except addBuilder() before start a join bridge // first. - ASSERT_ANY_THROW( - joinBridge->setHashTable(createFakeHashTable(), {}, false)); - ASSERT_ANY_THROW(joinBridge->setAntiJoinHasNullKeys()); - ASSERT_ANY_THROW(joinBridge->probeFinished()); - ASSERT_ANY_THROW(joinBridge->tableOrFuture(&futures[0])); - ASSERT_ANY_THROW(joinBridge->spillInputOrFuture(&futures[0])); + VELOX_ASSERT_THROW( + joinBridge->setHashTable(createFakeHashTable(), {}, false), ""); + VELOX_ASSERT_THROW(joinBridge->setAntiJoinHasNullKeys(), ""); + VELOX_ASSERT_THROW(joinBridge->probeFinished(), ""); + VELOX_ASSERT_THROW(joinBridge->tableOrFuture(&futures[0]), ""); + VELOX_ASSERT_THROW(joinBridge->spillInputOrFuture(&futures[0]), ""); + VELOX_ASSERT_THROW( + joinBridge->setSpilledHashTable(makeFakeSpillPartitionSet(0)), ""); // Can't start a bridge without any builders. - ASSERT_ANY_THROW(joinBridge->start()); + VELOX_ASSERT_THROW(joinBridge->start(), ""); joinBridge = createJoinBridge(); @@ -176,13 +178,13 @@ TEST_P(HashJoinBridgeTest, withoutSpill) { BaseHashTable* rawTable = nullptr; if (hasNullKeys) { joinBridge->setAntiJoinHasNullKeys(); - ASSERT_ANY_THROW(joinBridge->setAntiJoinHasNullKeys()); + VELOX_ASSERT_THROW(joinBridge->setAntiJoinHasNullKeys(), ""); } else { auto table = createFakeHashTable(); rawTable = table.get(); joinBridge->setHashTable(std::move(table), {}, false); - ASSERT_ANY_THROW( - joinBridge->setHashTable(createFakeHashTable(), {}, false)); + VELOX_ASSERT_THROW( + joinBridge->setHashTable(createFakeHashTable(), {}, false), ""); } for (int32_t i = 0; i < numProbers_; ++i) { @@ -209,15 +211,24 @@ TEST_P(HashJoinBridgeTest, withoutSpill) { } } - // Verify builder will see no spill input. + // Verify builder will wait for probe side finish signal even if there is no + // spill input. auto inputOr = joinBridge->spillInputOrFuture(&futures[0]); + ASSERT_FALSE(inputOr.has_value()); + ASSERT_TRUE(futures[0].valid()); + + // Probe side completion. + ASSERT_FALSE(joinBridge->probeFinished()); + + futures[0].wait(); + + futures = createEmptyFutures(1); + inputOr = joinBridge->spillInputOrFuture(&futures[0]); ASSERT_TRUE(inputOr.has_value()); ASSERT_FALSE(futures[0].valid()); ASSERT_TRUE(inputOr.value().spillPartition == nullptr); - // Probe side completion. - ASSERT_FALSE(joinBridge->probeFinished()); - ASSERT_ANY_THROW(joinBridge->probeFinished()); + VELOX_ASSERT_THROW(joinBridge->probeFinished(), ""); } } @@ -276,7 +287,7 @@ TEST_P(HashJoinBridgeTest, withSpill) { } else { numSpilledPartitions += spillPartitionSet.size(); spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet); - hasMoreSpill = joinBridge->setHashTable( + joinBridge->setHashTable( createFakeHashTable(), std::move(spillPartitionSet), false); } @@ -483,29 +494,32 @@ TEST_P(HashJoinBridgeTest, multiThreading) { } } -TEST_P(HashJoinBridgeTest, isHashBuildMemoryPool) { +TEST_P(HashJoinBridgeTest, isHashJoinMemoryPools) { auto root = memory::memoryManager()->addRootPool("isHashBuildMemoryPool"); struct { std::string poolName; - bool expectedHashBuildPool; + bool isHashBuildPool; + bool isHashProbePool; std::string debugString() const { return fmt::format( - "poolName: {}, expectedHashBuildPool: {}", + "poolName: {}, isHashBuildPool: {}, isHashProbePool: {}", poolName, - expectedHashBuildPool); + isHashBuildPool, + isHashProbePool); } } testSettings[] = { - {"HashBuild", true}, - {"HashBuildd", false}, - {"hHashBuild", true}, - {"hHashProbe", false}, - {"HashProbe", false}, - {"HashProbeh", false}}; + {"HashBuild", true, false}, + {"HashBuildd", false, false}, + {"hHashBuild", false, false}, + {"hHashProbe", false, false}, + {"HashProbe", false, true}, + {"HashProbeh", false, false}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); const auto pool = root->addLeafChild(testData.poolName); - ASSERT_EQ(isHashBuildMemoryPool(*pool), testData.expectedHashBuildPool); + ASSERT_EQ(isHashBuildMemoryPool(*pool), testData.isHashBuildPool); + ASSERT_EQ(isHashProbeMemoryPool(*pool), testData.isHashProbePool); } } diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 08dc5b8f7604..0cebc7cd9a3c 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -24,7 +24,6 @@ #include "velox/exec/HashBuild.h" #include "velox/exec/HashJoinBridge.h" #include "velox/exec/PlanNodeStats.h" -#include "velox/exec/TableScan.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" @@ -5498,7 +5497,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) { "facebook::velox::exec::Driver::runInternal::addInput", std::function(([&](Operator* testOp) { if (testOp->operatorType() != "HashBuild") { - ASSERT_FALSE(testOp->canReclaim()); return; } op = testOp; @@ -5746,10 +5744,10 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { concat(probeType_->names(), buildType_->names())) .planNode(); + std::atomic_bool driverWaitFlag{true}; folly::EventCount driverWait; - auto driverWaitKey = driverWait.prepareWait(); + std::atomic_bool testWaitFlag{true}; folly::EventCount testWait; - auto testWaitKey = testWait.prepareWait(); Operator* op; std::atomic injectSpillOnce{true}; @@ -5780,7 +5778,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { if (testOp->operatorType() != "HashProbe") { return; } - ASSERT_FALSE(testOp->canReclaim()); if (!injectOnce.exchange(false)) { return; } @@ -5790,11 +5787,12 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { const bool reclaimable = op->reclaimableBytes(reclaimableBytes); ASSERT_TRUE(reclaimable); ASSERT_GT(reclaimableBytes, 0); - testWait.notify(); + testWaitFlag = false; + testWait.notifyAll(); auto* driver = testOp->testingOperatorCtx()->driver(); auto task = driver->task(); SuspendedSection suspendedSection(driver); - driverWait.wait(driverWaitKey); + driverWait.await([&]() { return !driverWaitFlag.load(); }); }))); std::thread taskThread([&]() { @@ -5817,7 +5815,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { .run(); }); - testWait.wait(testWaitKey); + testWait.await([&]() { return !testWaitFlag.load(); }); ASSERT_TRUE(op != nullptr); auto task = op->testingOperatorCtx()->task(); auto taskPauseWait = task->requestPause(); @@ -5840,7 +5838,8 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { // No reclaim as the build operator is not in building table state. ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes()); - driverWait.notify(); + driverWaitFlag = false; + driverWait.notifyAll(); Task::resume(task); task.reset(); @@ -7075,165 +7074,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringJoinTableBuild) { waitForAllTasksToBeDeleted(); } -// This test is to reproduce a race condition that memory arbitrator tries to -// reclaim from a set of hash build operators in which the last hash build -// operator has finished. -DEBUG_ONLY_TEST_F(HashJoinTest, raceBetweenRaclaimAndJoinFinish) { - std::unique_ptr memoryManager = createMemoryManager(); - const auto& arbitrator = memoryManager->arbitrator(); - auto rowType = ROW({ - {"c0", INTEGER()}, - {"c1", INTEGER()}, - {"c2", VARCHAR()}, - }); - // Build a large vector to trigger memory arbitration. - fuzzerOpts_.vectorSize = 10'000; - std::vector vectors = createVectors(2, rowType, fuzzerOpts_); - createDuckDbTable(vectors); - - std::shared_ptr joinQueryCtx = - newQueryCtx(memoryManager, executor_, kMemoryCapacity); - - auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId planNodeId; - auto plan = PlanBuilder(planNodeIdGenerator) - .values(vectors, false) - .project({"c0 AS t0", "c1 AS t1", "c2 AS t2"}) - .hashJoin( - {"t0"}, - {"u0"}, - PlanBuilder(planNodeIdGenerator) - .values(vectors, true) - .project({"c0 AS u0", "c1 AS u1", "c2 AS u2"}) - .planNode(), - "", - {"t1"}, - core::JoinType::kAnti) - .capturePlanNodeId(planNodeId) - .planNode(); - - std::atomic waitForBuildFinishFlag{true}; - folly::EventCount waitForBuildFinishEvent; - std::atomic lastBuildDriver{nullptr}; - std::atomic task{nullptr}; - std::atomic isLastBuildFirstChildPool{false}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::HashBuild::finishHashBuild", - std::function([&](exec::HashBuild* buildOp) { - lastBuildDriver = buildOp->testingOperatorCtx()->driver(); - // Checks if the last build memory pool is the first build pool in its - // parent node pool. It is used to check the test result. - int buildPoolIndex{0}; - buildOp->pool()->parent()->visitChildren([&](memory::MemoryPool* pool) { - if (pool == buildOp->pool()) { - return false; - } - if (isHashBuildMemoryPool(*pool)) { - ++buildPoolIndex; - } - return true; - }); - isLastBuildFirstChildPool = (buildPoolIndex == 0); - task = lastBuildDriver.load()->task().get(); - waitForBuildFinishFlag = false; - waitForBuildFinishEvent.notifyAll(); - })); - - std::atomic waitForReclaimFlag{true}; - folly::EventCount waitForReclaimEvent; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal", - std::function([&](Driver* driver) { - auto* op = driver->findOperator(planNodeId); - if (op->operatorType() != "HashBuild" && - op->operatorType() != "HashProbe") { - return; - } - - // Suspend hash probe driver to wait for the test triggered reclaim to - // finish. - if (op->operatorType() == "HashProbe") { - op->pool()->reclaimer()->enterArbitration(); - waitForReclaimEvent.await( - [&]() { return !waitForReclaimFlag.load(); }); - op->pool()->reclaimer()->leaveArbitration(); - } - - // Check if we have reached to the last hash build operator or not. The - // testvalue callback will set the last build driver. - if (lastBuildDriver == nullptr) { - return; - } - - // Suspend all the remaining hash build drivers until the test triggered - // reclaim finish. - op->pool()->reclaimer()->enterArbitration(); - waitForReclaimEvent.await([&]() { return !waitForReclaimFlag.load(); }); - op->pool()->reclaimer()->leaveArbitration(); - })); - - const int numDrivers = 4; - std::thread queryThread([&]() { - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .maxDrivers(numDrivers) - .queryCtx(joinQueryCtx) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kJoinSpillEnabled, true) - .assertResults( - "SELECT c1 FROM tmp WHERE c0 NOT IN (SELECT c0 FROM tmp)"); - }); - - // Wait for the last hash build operator to start building the hash table. - waitForBuildFinishEvent.await([&] { return !waitForBuildFinishFlag.load(); }); - ASSERT_TRUE(lastBuildDriver != nullptr); - ASSERT_TRUE(task != nullptr); - - // Wait until the last build driver gets removed from the task after finishes. - while (task.load()->numFinishedDrivers() != 1) { - bool foundLastBuildDriver{false}; - task.load()->testingVisitDrivers([&](Driver* driver) { - if (driver == lastBuildDriver) { - foundLastBuildDriver = true; - } - }); - if (!foundLastBuildDriver) { - break; - } - } - - // Reclaim from the task, and we can't reclaim anything as we don't support - // spill after hash table built. - memory::MemoryReclaimer::Stats stats; - const uint64_t oldCapacity = joinQueryCtx->pool()->capacity(); - task.load()->pool()->shrink(); - task.load()->pool()->reclaim(1'000, 0, stats); - // If the last build memory pool is first child of its parent memory pool, - // then memory arbitration (or join node memory pool) will reclaim from the - // last build operator first which simply quits as the driver has gone. If - // not, we expect to get numNonReclaimableAttempts from any one of the - // remaining hash build operator. - if (isLastBuildFirstChildPool) { - ASSERT_EQ(stats.numNonReclaimableAttempts, 0); - } else { - ASSERT_EQ(stats.numNonReclaimableAttempts, 1); - } - // Make sure we don't leak memory capacity since we reclaim from task pool - // directly. - static_cast(task.load()->pool()) - ->testingSetCapacity(oldCapacity); - waitForReclaimFlag = false; - waitForReclaimEvent.notifyAll(); - - queryThread.join(); - - waitForAllTasksToBeDeleted(); - ASSERT_EQ(arbitrator->stats().numFailures, 0); - ASSERT_EQ(arbitrator->stats().numReclaimedBytes, 0); - ASSERT_EQ(arbitrator->stats().numReserves, 1); -} - DEBUG_ONLY_TEST_F(HashJoinTest, joinBuildSpillError) { const int kMemoryCapacity = 32 << 20; // Set a small memory capacity to trigger spill.