Skip to content

Commit

Permalink
Use shrink pools by arbitrator for spill test in hash join uni test (f…
Browse files Browse the repository at this point in the history
…acebookincubator#8932)

Summary:
- Add flag in OperatorTestBase to configure shared arbitrator to use in unit test
- Switch to use new api to create query pool and add the memory reclaimer to
   work with shared arbitrator
- Hash build switch to use test utility to trigger spilling by calling shared arbitrator's
   shrink pools function
- Improve the shrink pool test utility to ensure the memory pool is always associated
   with its memory manager
- Turn on shared arbitrator in hash join unit test

Eventually, we will make shared arbitrator the default for unit testing.

Pull Request resolved: facebookincubator#8932

Reviewed By: tanjialiang

Differential Revision: D54432889

Pulled By: xiaoxmeng

fbshipit-source-id: 484ccf356a555be837bfa4fa64fb03e6d16c8cb8
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Mar 2, 2024
1 parent a6672eb commit f391c02
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 39 deletions.
14 changes: 14 additions & 0 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,4 +457,18 @@ MemoryArbitrationContext* memoryArbitrationContext() {
bool underMemoryArbitration() {
return memoryArbitrationContext() != nullptr;
}

void testingRunArbitration(uint64_t targetBytes, MemoryManager* manager) {
if (manager == nullptr) {
manager = memory::memoryManager();
}
manager->shrinkPools(targetBytes);
}

void testingRunArbitration(MemoryPool* pool, uint64_t targetBytes) {
pool->enterArbitration();
static_cast<MemoryPoolImpl*>(pool)->testingManager()->shrinkPools(
targetBytes);
pool->leaveArbitration();
}
} // namespace facebook::velox::memory
14 changes: 14 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,18 @@ MemoryArbitrationContext* memoryArbitrationContext();

/// Returns true if the running thread is under memory arbitration or not.
bool underMemoryArbitration();

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it
/// shrinks from the process wide memory manager. If 'targetBytes' is zero, then
/// reclaims all the memory from 'manager' if possible.
class MemoryManager;
void testingRunArbitration(
uint64_t targetBytes = 0,
MemoryManager* manager = nullptr);

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' of 'pool' by invoking its shrinkPools API. If 'targetBytes' is
/// zero, then reclaims all the memory from 'manager' if possible.
void testingRunArbitration(MemoryPool* pool, uint64_t targetBytes = 0);
} // namespace facebook::velox::memory
4 changes: 4 additions & 0 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,10 @@ class MemoryPoolImpl : public MemoryPool {

void testingSetCapacity(int64_t bytes);

MemoryManager* testingManager() const {
return manager_;
}

MemoryAllocator* testingAllocator() const {
return allocator_;
}
Expand Down
6 changes: 4 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ class QueryCtx {

void initPool(const std::string& queryId) {
if (pool_ == nullptr) {
pool_ = memory::deprecatedDefaultMemoryManager().addRootPool(
QueryCtx::generatePoolName(queryId));
pool_ = memory::memoryManager()->addRootPool(
QueryCtx::generatePoolName(queryId),
memory::kMaxMemory,
memory::MemoryReclaimer::create());
}
}

Expand Down
15 changes: 7 additions & 8 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ bool HashBuild::ensureInputFits(RowVectorPtr& input) {
bool HashBuild::reserveMemory(const RowVectorPtr& input) {
VELOX_CHECK(spillEnabled());

Operator::ReclaimableSectionGuard guard(this);
numSpillRows_ = 0;
numSpillBytes_ = 0;

Expand All @@ -468,9 +469,10 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
if (numRows != 0) {
// Test-only spill path.
if (testingTriggerSpill()) {
numSpillRows_ = std::max<int64_t>(1, numRows / 10);
numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow;
return false;
memory::testingRunArbitration(pool());
// NOTE: the memory arbitration should have triggered spilling on this
// hash build operator so we return true to indicate have enough memory.
return true;
}

// We check usage from the parent pool to take peers' allocations into
Expand Down Expand Up @@ -522,11 +524,8 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);

{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(targetIncrementBytes)) {
return true;
}
if (pool()->maybeReserve(targetIncrementBytes)) {
return true;
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "velox/exec/HashBuild.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/TableScan.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
Expand Down Expand Up @@ -734,6 +733,11 @@ class HashJoinBuilder {

class HashJoinTest : public HiveConnectorTestBase {
protected:
static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
HiveConnectorTestBase::SetUpTestCase();
}

HashJoinTest() : HashJoinTest(TestParam(1)) {}

explicit HashJoinTest(const TestParam& param)
Expand Down
17 changes: 0 additions & 17 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,4 @@ QueryTestResult runWriteTask(
}
return result;
}

void testingRunArbitration(
memory::MemoryPool* pool,
uint64_t targetBytes,
memory::MemoryManager* manager) {
if (manager == nullptr) {
manager = memory::memoryManager();
}
if (pool != nullptr) {
pool->enterArbitration();
manager->shrinkPools(targetBytes);
pool->leaveArbitration();
} else {
manager->shrinkPools(targetBytes);
}
}

} // namespace facebook::velox::exec::test
11 changes: 0 additions & 11 deletions velox/exec/tests/utils/ArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,4 @@ QueryTestResult runWriteTask(
const std::string& kHiveConnectorId,
bool enableSpilling,
const RowVectorPtr& expectedResult = nullptr);

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it
/// shrinks from the process wide memory manager. If 'pool' is provided, the
/// function puts 'pool' in arbitration state before the arbitration to ease
/// test use. If 'targetBytes' is zero, then reclaims all the memory from
/// 'manager' if possible.
void testingRunArbitration(
memory::MemoryPool* pool = nullptr,
uint64_t targetBytes = 0,
memory::MemoryManager* manager = nullptr);
} // namespace facebook::velox::exec::test
10 changes: 10 additions & 0 deletions velox/exec/tests/utils/OperatorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

DECLARE_bool(velox_memory_leak_check_enabled);
DECLARE_bool(velox_enable_memory_usage_track_in_default_memory_pool);
DEFINE_bool(
velox_testing_enable_arbitration,
false,
"Enable to turn on arbitration for tests by default");

using namespace facebook::velox::common::testutil;

Expand All @@ -58,6 +62,12 @@ void OperatorTestBase::SetUpTestCase() {
exec::SharedArbitrator::registerFactory();
memory::MemoryManagerOptions options;
options.allocatorCapacity = 8L << 30;
if (FLAGS_velox_testing_enable_arbitration) {
options.arbitratorCapacity = 6L << 30;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
options.arbitrationStateCheckCb = memoryArbitrationStateCheck;
}
memory::MemoryManager::testingSetInstance(options);
asyncDataCache_ = cache::AsyncDataCache::create(memoryManager()->allocator());
cache::AsyncDataCache::setInstance(asyncDataCache_.get());
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/utils/OperatorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "velox/vector/tests/utils/VectorMaker.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

DECLARE_bool(velox_testing_enable_arbitration);

namespace facebook::velox::exec::test {
class OperatorTestBase : public testing::Test,
public velox::test::VectorTestBase {
Expand Down
1 change: 1 addition & 0 deletions velox/expression/tests/FuzzerRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ int FuzzerRunner::run(
void FuzzerRunner::runFromGtest(
size_t seed,
const std::unordered_set<std::string>& skipFunctions) {
memory::MemoryManager::testingSetInstance({});
auto signatures = facebook::velox::getFunctionSignatures();
ExpressionFuzzerVerifier(
signatures, seed, getExpressionFuzzerVerifierOptions(skipFunctions))
Expand Down

0 comments on commit f391c02

Please sign in to comment.