diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index 5ac55692cdda..1234a2cd55d7 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -200,7 +200,10 @@ bool MemoryReclaimer::reclaimableBytes( if (pool.kind() == MemoryPool::Kind::kLeaf) { return false; } - bool reclaimable{false}; + if (pool.capacity() < pool.minCapacity_) { + return false; + } + bool reclaimable{false}; pool.visitChildren([&](MemoryPool* pool) { auto reclaimableBytesOpt = pool->reclaimableBytes(); reclaimable |= reclaimableBytesOpt.has_value(); @@ -219,6 +222,9 @@ uint64_t MemoryReclaimer::reclaim( if (pool->kind() == MemoryPool::Kind::kLeaf) { return 0; } + if (pool->capacity() < pool->minCapacity_) { + return 0; + } // Sort the child pools based on their reserved memory and reclaim from the // child pool with most reservation first. diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index c1a3dbb0e772..8d0b6168d1a8 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -219,11 +219,13 @@ MemoryPool::MemoryPool( alignment_(options.alignment), parent_(std::move(parent)), maxCapacity_(parent_ == nullptr ? options.maxCapacity : kMaxMemory), + minCapacity_(parent_ == nullptr ? options.minCapacity : 0), trackUsage_(options.trackUsage), threadSafe_(options.threadSafe), debugEnabled_(options.debugEnabled), coreOnAllocationFailureEnabled_(options.coreOnAllocationFailureEnabled) { VELOX_CHECK(!isRoot() || !isLeaf()); + VELOX_CHECK_LE(minCapacity_, maxCapacity_); VELOX_CHECK_GT( maxCapacity_, 0, "Memory pool {} max capacity can't be zero", name_); MemoryAllocator::alignmentCheck(0, alignment_); @@ -804,11 +806,7 @@ bool MemoryPoolImpl::incrementReservationThreadSafe( VELOX_CHECK_NULL(parent_); -<<<<<<< HEAD ++numCapacityGrowths_; -======= - ++numCapacityGrowth_; ->>>>>>> 23d76d58f ([WIP]add reserved query capacity) if (growCapacityCb_(requestor, size)) { TestValue::adjust( "facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe::AfterGrowCallback", @@ -1018,19 +1016,26 @@ void MemoryPoolImpl::leaveArbitration() noexcept { } } -uint64_t MemoryPoolImpl::shrink(uint64_t targetBytes) { +uint64_t MemoryPoolImpl::shrink(uint64_t targetBytes, bool force) { if (parent_ != nullptr) { return parent_->shrink(targetBytes); } std::lock_guard l(mutex_); // We don't expect to shrink a memory pool without capacity limit. VELOX_CHECK_NE(capacity_, kMaxMemory); - uint64_t freeBytes = std::max(0, capacity_ - reservationBytes_); + uint64_t availableBytes{0}; + if (force) { + availableBytes = std::max(0, capacity_ - reservationBytes_); + } else { + // NOTE: 'minCapacity_' could be less than 'capacity_'. + availableBytes = std::max( + 0, std::min(minCapacity_, capacity_) - reservationBytes_); + } if (targetBytes != 0) { - freeBytes = std::min(targetBytes, freeBytes); + availableBytes = std::min(targetBytes, availableBytes); } - capacity_ -= freeBytes; - return freeBytes; + capacity_ -= availableBytes; + return availableBytes; } uint64_t MemoryPoolImpl::grow(uint64_t bytes) noexcept { diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index bbbe06352747..fa3a90cb2509 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -127,6 +127,7 @@ class MemoryPool : public std::enable_shared_from_this { uint16_t alignment{MemoryAllocator::kMaxAlignment}; /// Specifies the max memory capacity of this memory pool. int64_t maxCapacity{kMaxMemory}; + int64_t minCapacity{0}; /// If true, tracks the memory usage from the leaf memory pool and aggregate /// up to the root memory pool for capacity enforcement. Otherwise there is @@ -360,7 +361,7 @@ class MemoryPool : public std::enable_shared_from_this { /// this memory pool's capacity without actually freeing any used memory. The /// function returns the actually freed memory capacity in bytes. If /// 'targetBytes' is zero, the function frees all the free memory capacity. - virtual uint64_t shrink(uint64_t targetBytes = 0) = 0; + virtual uint64_t shrink(uint64_t targetBytes = 0, bool force = false) = 0; /// Invoked to increase the memory pool's capacity by 'bytes'. The function /// returns the memory pool's capacity after the growth. @@ -523,6 +524,7 @@ class MemoryPool : public std::enable_shared_from_this { const uint16_t alignment_; const std::shared_ptr parent_; const int64_t maxCapacity_; + const int64_t minCapacity_; const bool trackUsage_; const bool threadSafe_; const bool debugEnabled_; @@ -644,7 +646,7 @@ class MemoryPoolImpl : public MemoryPool { uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) override; - uint64_t shrink(uint64_t targetBytes = 0) override; + uint64_t shrink(uint64_t targetBytes = 0, bool force = false) override; uint64_t grow(uint64_t bytes) noexcept override; diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 3bf7b88488e6..b803112cb959 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -259,7 +259,7 @@ uint64_t SharedArbitrator::shrinkCapacity( { std::lock_guard l(mutex_); ++numReleases_; - freedBytes = pool->shrink(targetBytes); + freedBytes = pool->shrink(targetBytes, true); incrementFreeCapacityLocked(freedBytes); freeCapacity = freeCapacity_ + freeReservedCapacity_; freeReservedCapacity = freeReservedCapacity_; @@ -430,7 +430,7 @@ bool SharedArbitrator::handleOOM( } // Free up all the unused capacity from the aborted memory pool and gives back // to the arbitrator. - incrementFreeCapacity(victim->shrink()); + incrementFreeCapacity(victim->shrink(0, true)); return true; } @@ -510,7 +510,7 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( if (bytesToShrink <= 0) { break; } - freedBytes += candidate.pool->shrink(bytesToShrink); + freedBytes += candidate.pool->shrink(bytesToShrink, false); if (freedBytes >= targetBytes) { break; } @@ -566,7 +566,7 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort( } catch (VeloxRuntimeError&) { abort(candidate.pool, std::current_exception()); } - freedBytes += candidate.pool->shrink(); + freedBytes += candidate.pool->shrink(0, true); if (freedBytes >= targetBytes) { break; } @@ -588,7 +588,7 @@ uint64_t SharedArbitrator::reclaim( MicrosecondTimer reclaimTimer(&reclaimDurationUs); const uint64_t oldCapacity = pool->capacity(); try { - freedBytes = pool->shrink(targetBytes); + freedBytes = pool->shrink(targetBytes, false); if (freedBytes < targetBytes) { if (isLocalArbitration) { incrementLocalArbitrationCount(); @@ -602,7 +602,7 @@ uint64_t SharedArbitrator::reclaim( abort(pool, std::current_exception()); // Free up all the free capacity from the aborted pool as the associated // query has failed at this point. - pool->shrink(); + pool->shrink(0, true); } const uint64_t newCapacity = pool->capacity(); VELOX_CHECK_GE(oldCapacity, newCapacity); diff --git a/velox/dwio/dwrf/test/WriterFlushTest.cpp b/velox/dwio/dwrf/test/WriterFlushTest.cpp index ae7d6ea3173e..c170b0ed91c8 100644 --- a/velox/dwio/dwrf/test/WriterFlushTest.cpp +++ b/velox/dwio/dwrf/test/WriterFlushTest.cpp @@ -203,7 +203,7 @@ class MockMemoryPool : public velox::memory::MemoryPool { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } - uint64_t shrink(uint64_t /*unused*/) override { + uint64_t shrink(uint64_t /*unused*/, bool /*unused*/) override { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 919ef656e9a4..4f4d1d8faf2f 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -654,7 +654,7 @@ uint64_t Operator::MemoryReclaimer::reclaim( auto reclaimBytes = memory::MemoryReclaimer::run( [&]() { op_->reclaim(targetBytes, stats); - return pool->shrink(targetBytes); + return pool->shrink(targetBytes, false); }, stats);