From 9edc1c8b3fabf50db5d80588433ff10c2059fbb2 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Thu, 29 Feb 2024 12:54:35 -0800 Subject: [PATCH] Add memory arbitration test utilities Add test utility to trigger memory arbitration by invoking shrinkPools API --- velox/common/memory/Memory.h | 8 ++- velox/common/memory/MemoryArbitrator.h | 5 +- .../memory/tests/MockSharedArbitratorTest.cpp | 69 ++++++++++++------- velox/exec/SharedArbitrator.cpp | 6 +- velox/exec/tests/SharedArbitratorTest.cpp | 1 - velox/exec/tests/utils/ArbitratorTestUtil.cpp | 16 +++++ velox/exec/tests/utils/ArbitratorTestUtil.h | 10 +++ 7 files changed, 84 insertions(+), 31 deletions(-) diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index 18adbce98c2e..87e2cb3ea1fa 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -224,8 +224,12 @@ class MemoryManager { bool threadSafe = true); /// Invoked to shrink alive pools to free 'targetBytes' capacity. The function - /// returns the actual freed memory capacity in bytes. - uint64_t shrinkPools(uint64_t targetBytes); + /// returns the actual freed memory capacity in bytes. If 'targetBytes' is + /// zero, then try to reclaim all the memory from the alive pools. + /// + /// TODO: add option to enable spilling or not. If spilling is disabled, then + /// the arbitrator might reclaim memory by killing queries. + uint64_t shrinkPools(uint64_t targetBytes = 0); /// Default unmanaged leaf pool with no threadsafe stats support. Libraries /// using this method can get a pool that is shared with other threads. The diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index d152f730f845..20371f6bdc26 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -149,8 +149,9 @@ class MemoryArbitrator { /// Invoked by the memory manager to shrink memory capacity from a given list /// of memory pools by reclaiming free and used memory. The freed memory - /// capacity is given back to the arbitrator. The function returns the actual - /// freed memory capacity in bytes. + /// capacity is given back to the arbitrator. If 'targetBytes' is zero, then + /// try to reclaim all the memory from 'pools'. The function returns the + /// actual freed memory capacity in bytes. virtual uint64_t shrinkCapacity( const std::vector>& pools, uint64_t targetBytes) = 0; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 0071a04c1d67..4cb675736618 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -590,31 +590,50 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailsTask) { growOp->freeAll(); } -TEST_F(MockSharedArbitrationTest, shrinkMemory) { - auto task1 = addTask(64 * MB); - auto op1 = addMemoryOp(task1); - auto task2 = addTask(64 * MB); - auto op2 = addMemoryOp(task2); - - op1->allocate(64 * MB); - op1->freeAll(); - auto bufOp11 = op1->allocate(32 * MB); - auto bufOp12 = op1->allocate(32 * MB); - op1->free(bufOp11); - ASSERT_EQ(op1->pool()->root()->capacity(), 64 * MB); - ASSERT_EQ(op1->pool()->root()->currentBytes(), 32 * MB); - - op2->allocate(64 * MB); - op2->freeAll(); - auto bufOp21 = op2->allocate(32 * MB); - auto bufOp22 = op2->allocate(32 * MB); - op2->free(bufOp21); - ASSERT_EQ(op2->pool()->root()->capacity(), 64 * MB); - ASSERT_EQ(op2->pool()->root()->currentBytes(), 32 * MB); - - ASSERT_EQ(manager_->shrinkPools(kMaxMemory), 128 * MB); - ASSERT_EQ(op1->capacity(), 0); - ASSERT_EQ(op2->capacity(), 0); +TEST_F(MockSharedArbitrationTest, shrinkPools) { + struct { + uint64_t targetBytes; + uint64_t expectedFreedBytes; + + std::string debugString() const { + return fmt::format( + "targetBytes: {}, expectedFreedBytes: {}", + succinctBytes(targetBytes), + succinctBytes(expectedFreedBytes)); + } + } testSettings[] = { + {0, kMemoryCapacity}, + {1UL << 30, kMemoryCapacity}, + {1, kMemoryPoolTransferCapacity}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto task1 = addTask(kMemoryCapacity / 4); + auto* op1 = addMemoryOp(task1); + auto* buf1 = op1->allocate(kMemoryCapacity / 4); + ASSERT_EQ(op1->capacity(), kMemoryCapacity / 4); + + auto task2 = addTask(kMemoryCapacity); + auto* op2 = addMemoryOp(task2); + auto* buf2 = op2->allocate(kMemoryCapacity / 4 * 3); + ASSERT_EQ(op2->capacity(), kMemoryCapacity / 4 * 3); + + ASSERT_EQ( + manager_->shrinkPools(testData.targetBytes), + testData.expectedFreedBytes); + if (testData.targetBytes == 1) { + ASSERT_GT(op1->capacity(), 0); + ASSERT_GT(op2->capacity(), 0); + } else { + ASSERT_EQ(op1->capacity(), 0); + ASSERT_EQ(op2->capacity(), 0); + } + ASSERT_EQ( + op1->capacity() + op2->capacity() + + arbitrator_->stats().freeCapacityBytes, + arbitrator_->capacity()); + } } TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) { diff --git a/velox/exec/SharedArbitrator.cpp b/velox/exec/SharedArbitrator.cpp index ab8fc7950ff1..c859a611d7b5 100644 --- a/velox/exec/SharedArbitrator.cpp +++ b/velox/exec/SharedArbitrator.cpp @@ -209,7 +209,11 @@ uint64_t SharedArbitrator::shrinkCapacity( const std::vector>& pools, uint64_t targetBytes) { ScopedArbitration scopedArbitration(this); - targetBytes = std::max(memoryPoolTransferCapacity_, targetBytes); + if (targetBytes == 0) { + targetBytes = capacity_; + } else { + targetBytes = std::max(memoryPoolTransferCapacity_, targetBytes); + } std::vector candidates = getCandidateStats(pools); auto freedBytes = reclaimFreeMemoryFromCandidates(candidates, targetBytes); if (freedBytes >= targetBytes) { diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 3dd632f48dd8..9729f7cd9fa0 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -22,7 +22,6 @@ #include #include #include "folly/experimental/EventCount.h" -#include "folly/futures/Barrier.h" #include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/memory/MallocAllocator.h" diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index 61558df69b4c..e72b0743c778 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -349,4 +349,20 @@ QueryTestResult runWriteTask( return result; } +void testingRunArbitration( + memory::MemoryPool* pool, + uint64_t targetBytes, + memory::MemoryManager* manager) { + if (manager == nullptr) { + manager = memory::memoryManager(); + } + if (pool != nullptr) { + pool->enterArbitration(); + manager->shrinkPools(targetBytes); + pool->leaveArbitration(); + } else { + manager->shrinkPools(targetBytes); + } +} + } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index 8b99815c3e5e..ddbddd232dc4 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -180,4 +180,14 @@ QueryTestResult runWriteTask( bool enableSpilling, const RowVectorPtr& expectedResult = nullptr); +/// The function triggers memory arbitration by shrinking memory pools from +/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it +/// shrinks from the process wide memory manager. If 'pool' is provided, the +/// function puts 'pool' in arbitration state before the arbitration to ease +/// test use. If 'targetBytes' is zero, then reclaims all the memory from +/// 'manager' if possible. +void testingRunArbitration( + memory::MemoryPool* pool = nullptr, + uint64_t targetBytes = 0, + memory::MemoryManager* manager = nullptr); } // namespace facebook::velox::exec::test