Skip to content

Commit

Permalink
Add memory arbitration test utilities
Browse files Browse the repository at this point in the history
Add test utility to trigger memory arbitration by invoking shrinkPools API
  • Loading branch information
xiaoxmeng committed Feb 29, 2024
1 parent b557ab6 commit 9edc1c8
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 31 deletions.
8 changes: 6 additions & 2 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,12 @@ class MemoryManager {
bool threadSafe = true);

/// Invoked to shrink alive pools to free 'targetBytes' capacity. The function
/// returns the actual freed memory capacity in bytes.
uint64_t shrinkPools(uint64_t targetBytes);
/// returns the actual freed memory capacity in bytes. If 'targetBytes' is
/// zero, then try to reclaim all the memory from the alive pools.
///
/// TODO: add option to enable spilling or not. If spilling is disabled, then
/// the arbitrator might reclaim memory by killing queries.
uint64_t shrinkPools(uint64_t targetBytes = 0);

/// Default unmanaged leaf pool with no threadsafe stats support. Libraries
/// using this method can get a pool that is shared with other threads. The
Expand Down
5 changes: 3 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ class MemoryArbitrator {

/// Invoked by the memory manager to shrink memory capacity from a given list
/// of memory pools by reclaiming free and used memory. The freed memory
/// capacity is given back to the arbitrator. The function returns the actual
/// freed memory capacity in bytes.
/// capacity is given back to the arbitrator. If 'targetBytes' is zero, then
/// try to reclaim all the memory from 'pools'. The function returns the
/// actual freed memory capacity in bytes.
virtual uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) = 0;
Expand Down
69 changes: 44 additions & 25 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,31 +590,50 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailsTask) {
growOp->freeAll();
}

TEST_F(MockSharedArbitrationTest, shrinkMemory) {
auto task1 = addTask(64 * MB);
auto op1 = addMemoryOp(task1);
auto task2 = addTask(64 * MB);
auto op2 = addMemoryOp(task2);

op1->allocate(64 * MB);
op1->freeAll();
auto bufOp11 = op1->allocate(32 * MB);
auto bufOp12 = op1->allocate(32 * MB);
op1->free(bufOp11);
ASSERT_EQ(op1->pool()->root()->capacity(), 64 * MB);
ASSERT_EQ(op1->pool()->root()->currentBytes(), 32 * MB);

op2->allocate(64 * MB);
op2->freeAll();
auto bufOp21 = op2->allocate(32 * MB);
auto bufOp22 = op2->allocate(32 * MB);
op2->free(bufOp21);
ASSERT_EQ(op2->pool()->root()->capacity(), 64 * MB);
ASSERT_EQ(op2->pool()->root()->currentBytes(), 32 * MB);

ASSERT_EQ(manager_->shrinkPools(kMaxMemory), 128 * MB);
ASSERT_EQ(op1->capacity(), 0);
ASSERT_EQ(op2->capacity(), 0);
TEST_F(MockSharedArbitrationTest, shrinkPools) {
struct {
uint64_t targetBytes;
uint64_t expectedFreedBytes;

std::string debugString() const {
return fmt::format(
"targetBytes: {}, expectedFreedBytes: {}",
succinctBytes(targetBytes),
succinctBytes(expectedFreedBytes));
}
} testSettings[] = {
{0, kMemoryCapacity},
{1UL << 30, kMemoryCapacity},
{1, kMemoryPoolTransferCapacity}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto task1 = addTask(kMemoryCapacity / 4);
auto* op1 = addMemoryOp(task1);
auto* buf1 = op1->allocate(kMemoryCapacity / 4);
ASSERT_EQ(op1->capacity(), kMemoryCapacity / 4);

auto task2 = addTask(kMemoryCapacity);
auto* op2 = addMemoryOp(task2);
auto* buf2 = op2->allocate(kMemoryCapacity / 4 * 3);
ASSERT_EQ(op2->capacity(), kMemoryCapacity / 4 * 3);

ASSERT_EQ(
manager_->shrinkPools(testData.targetBytes),
testData.expectedFreedBytes);
if (testData.targetBytes == 1) {
ASSERT_GT(op1->capacity(), 0);
ASSERT_GT(op2->capacity(), 0);
} else {
ASSERT_EQ(op1->capacity(), 0);
ASSERT_EQ(op2->capacity(), 0);
}
ASSERT_EQ(
op1->capacity() + op2->capacity() +
arbitrator_->stats().freeCapacityBytes,
arbitrator_->capacity());
}
}

TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) {
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,11 @@ uint64_t SharedArbitrator::shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) {
ScopedArbitration scopedArbitration(this);
targetBytes = std::max(memoryPoolTransferCapacity_, targetBytes);
if (targetBytes == 0) {
targetBytes = capacity_;
} else {
targetBytes = std::max(memoryPoolTransferCapacity_, targetBytes);
}
std::vector<Candidate> candidates = getCandidateStats(pools);
auto freedBytes = reclaimFreeMemoryFromCandidates(candidates, targetBytes);
if (freedBytes >= targetBytes) {
Expand Down
1 change: 0 additions & 1 deletion velox/exec/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <functional>
#include <optional>
#include "folly/experimental/EventCount.h"
#include "folly/futures/Barrier.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/memory/MallocAllocator.h"
Expand Down
16 changes: 16 additions & 0 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,20 @@ QueryTestResult runWriteTask(
return result;
}

void testingRunArbitration(
memory::MemoryPool* pool,
uint64_t targetBytes,
memory::MemoryManager* manager) {
if (manager == nullptr) {
manager = memory::memoryManager();
}
if (pool != nullptr) {
pool->enterArbitration();
manager->shrinkPools(targetBytes);
pool->leaveArbitration();
} else {
manager->shrinkPools(targetBytes);
}
}

} // namespace facebook::velox::exec::test
10 changes: 10 additions & 0 deletions velox/exec/tests/utils/ArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,14 @@ QueryTestResult runWriteTask(
bool enableSpilling,
const RowVectorPtr& expectedResult = nullptr);

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it
/// shrinks from the process wide memory manager. If 'pool' is provided, the
/// function puts 'pool' in arbitration state before the arbitration to ease
/// test use. If 'targetBytes' is zero, then reclaims all the memory from
/// 'manager' if possible.
void testingRunArbitration(
memory::MemoryPool* pool = nullptr,
uint64_t targetBytes = 0,
memory::MemoryManager* manager = nullptr);
} // namespace facebook::velox::exec::test

0 comments on commit 9edc1c8

Please sign in to comment.