Skip to content

Commit

Permalink
[WIP]add reserved query capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Apr 10, 2024
1 parent 9b83828 commit 901f104
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 20 deletions.
8 changes: 7 additions & 1 deletion velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ bool MemoryReclaimer::reclaimableBytes(
if (pool.kind() == MemoryPool::Kind::kLeaf) {
return false;
}
bool reclaimable{false};
if (pool.capacity() < pool.minCapacity_) {
return false;
}
bool reclaimable{false};
pool.visitChildren([&](MemoryPool* pool) {
auto reclaimableBytesOpt = pool->reclaimableBytes();
reclaimable |= reclaimableBytesOpt.has_value();
Expand All @@ -219,6 +222,9 @@ uint64_t MemoryReclaimer::reclaim(
if (pool->kind() == MemoryPool::Kind::kLeaf) {
return 0;
}
if (pool->capacity() < pool->minCapacity_) {
return 0;
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
Expand Down
23 changes: 14 additions & 9 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,13 @@ MemoryPool::MemoryPool(
alignment_(options.alignment),
parent_(std::move(parent)),
maxCapacity_(parent_ == nullptr ? options.maxCapacity : kMaxMemory),
minCapacity_(parent_ == nullptr ? options.minCapacity : 0),
trackUsage_(options.trackUsage),
threadSafe_(options.threadSafe),
debugEnabled_(options.debugEnabled),
coreOnAllocationFailureEnabled_(options.coreOnAllocationFailureEnabled) {
VELOX_CHECK(!isRoot() || !isLeaf());
VELOX_CHECK_LE(minCapacity_, maxCapacity_);
VELOX_CHECK_GT(
maxCapacity_, 0, "Memory pool {} max capacity can't be zero", name_);
MemoryAllocator::alignmentCheck(0, alignment_);
Expand Down Expand Up @@ -804,11 +806,7 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(

VELOX_CHECK_NULL(parent_);

<<<<<<< HEAD
++numCapacityGrowths_;
=======
++numCapacityGrowth_;
>>>>>>> 23d76d58f ([WIP]add reserved query capacity)
if (growCapacityCb_(requestor, size)) {
TestValue::adjust(
"facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe::AfterGrowCallback",
Expand Down Expand Up @@ -1018,19 +1016,26 @@ void MemoryPoolImpl::leaveArbitration() noexcept {
}
}

uint64_t MemoryPoolImpl::shrink(uint64_t targetBytes) {
uint64_t MemoryPoolImpl::shrink(uint64_t targetBytes, bool force) {
if (parent_ != nullptr) {
return parent_->shrink(targetBytes);
}
std::lock_guard<std::mutex> l(mutex_);
// We don't expect to shrink a memory pool without capacity limit.
VELOX_CHECK_NE(capacity_, kMaxMemory);
uint64_t freeBytes = std::max<uint64_t>(0, capacity_ - reservationBytes_);
uint64_t availableBytes{0};
if (force) {
availableBytes = std::max<uint64_t>(0, capacity_ - reservationBytes_);
} else {
// NOTE: 'minCapacity_' could be less than 'capacity_'.
availableBytes = std::max<int64_t>(
0, std::min(minCapacity_, capacity_) - reservationBytes_);
}
if (targetBytes != 0) {
freeBytes = std::min(targetBytes, freeBytes);
availableBytes = std::min(targetBytes, availableBytes);
}
capacity_ -= freeBytes;
return freeBytes;
capacity_ -= availableBytes;
return availableBytes;
}

uint64_t MemoryPoolImpl::grow(uint64_t bytes) noexcept {
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
uint16_t alignment{MemoryAllocator::kMaxAlignment};
/// Specifies the max memory capacity of this memory pool.
int64_t maxCapacity{kMaxMemory};
int64_t minCapacity{0};

/// If true, tracks the memory usage from the leaf memory pool and aggregate
/// up to the root memory pool for capacity enforcement. Otherwise there is
Expand Down Expand Up @@ -360,7 +361,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// this memory pool's capacity without actually freeing any used memory. The
/// function returns the actually freed memory capacity in bytes. If
/// 'targetBytes' is zero, the function frees all the free memory capacity.
virtual uint64_t shrink(uint64_t targetBytes = 0) = 0;
virtual uint64_t shrink(uint64_t targetBytes = 0, bool force = false) = 0;

/// Invoked to increase the memory pool's capacity by 'bytes'. The function
/// returns the memory pool's capacity after the growth.
Expand Down Expand Up @@ -523,6 +524,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
const uint16_t alignment_;
const std::shared_ptr<MemoryPool> parent_;
const int64_t maxCapacity_;
const int64_t minCapacity_;
const bool trackUsage_;
const bool threadSafe_;
const bool debugEnabled_;
Expand Down Expand Up @@ -644,7 +646,7 @@ class MemoryPoolImpl : public MemoryPool {
uint64_t maxWaitMs,
memory::MemoryReclaimer::Stats& stats) override;

uint64_t shrink(uint64_t targetBytes = 0) override;
uint64_t shrink(uint64_t targetBytes = 0, bool force = false) override;

uint64_t grow(uint64_t bytes) noexcept override;

Expand Down
12 changes: 6 additions & 6 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ uint64_t SharedArbitrator::shrinkCapacity(
{
std::lock_guard<std::mutex> l(mutex_);
++numReleases_;
freedBytes = pool->shrink(targetBytes);
freedBytes = pool->shrink(targetBytes, true);
incrementFreeCapacityLocked(freedBytes);
freeCapacity = freeCapacity_ + freeReservedCapacity_;
freeReservedCapacity = freeReservedCapacity_;
Expand Down Expand Up @@ -430,7 +430,7 @@ bool SharedArbitrator::handleOOM(
}
// Free up all the unused capacity from the aborted memory pool and gives back
// to the arbitrator.
incrementFreeCapacity(victim->shrink());
incrementFreeCapacity(victim->shrink(0, true));
return true;
}

Expand Down Expand Up @@ -510,7 +510,7 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
if (bytesToShrink <= 0) {
break;
}
freedBytes += candidate.pool->shrink(bytesToShrink);
freedBytes += candidate.pool->shrink(bytesToShrink, false);
if (freedBytes >= targetBytes) {
break;
}
Expand Down Expand Up @@ -566,7 +566,7 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
} catch (VeloxRuntimeError&) {
abort(candidate.pool, std::current_exception());
}
freedBytes += candidate.pool->shrink();
freedBytes += candidate.pool->shrink(0, true);
if (freedBytes >= targetBytes) {
break;
}
Expand All @@ -588,7 +588,7 @@ uint64_t SharedArbitrator::reclaim(
MicrosecondTimer reclaimTimer(&reclaimDurationUs);
const uint64_t oldCapacity = pool->capacity();
try {
freedBytes = pool->shrink(targetBytes);
freedBytes = pool->shrink(targetBytes, false);
if (freedBytes < targetBytes) {
if (isLocalArbitration) {
incrementLocalArbitrationCount();
Expand All @@ -602,7 +602,7 @@ uint64_t SharedArbitrator::reclaim(
abort(pool, std::current_exception());
// Free up all the free capacity from the aborted pool as the associated
// query has failed at this point.
pool->shrink();
pool->shrink(0, true);
}
const uint64_t newCapacity = pool->capacity();
VELOX_CHECK_GE(oldCapacity, newCapacity);
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/WriterFlushTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class MockMemoryPool : public velox::memory::MemoryPool {
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
}

uint64_t shrink(uint64_t /*unused*/) override {
uint64_t shrink(uint64_t /*unused*/, bool /*unused*/) override {
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ uint64_t Operator::MemoryReclaimer::reclaim(
auto reclaimBytes = memory::MemoryReclaimer::run(
[&]() {
op_->reclaim(targetBytes, stats);
return pool->shrink(targetBytes);
return pool->shrink(targetBytes, false);
},
stats);

Expand Down

0 comments on commit 901f104

Please sign in to comment.