Skip to content

Commit

Permalink
Fix recursive memory arbitration caused by parallel spill
Browse files Browse the repository at this point in the history
Summary:
We do parallel spill at plan node level which reclaim memory from background threads which
might trigger memory allocation like file writer stripe flush. The memory allocation might
trigger memory arbitration again if the background thread doesn't carry the memory arbitration
context. The recursive memory arbitration might deadlock the new memory arbitration requests, and
eventually all the driver threads on a worker. We currently have createAsyncMemoryReclaimTask
which carries on the memory arbitration context but it doesn't cover all the cases (this is due to a recent
regression which breaks the dependency from memory module on velox common base as async source
is defined in common base).

The fix is to use createAsyncMemoryReclaimTask to trigger parallel spill. The follow is to have a generic
execution context framework and the async source automatically pickups the current set execution
context. Also with global arbitration optimization, we won't block driver thread execution from different threads.
It also has e2e memory arbitration request timeout mechanism to prevent this.

Reviewed By: kevinwilfong, oerling

Differential Revision: D62512713
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 11, 2024
1 parent 485329e commit 0079d4a
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 62 deletions.
1 change: 1 addition & 0 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
uint64_t MemoryReclaimer::run(
const std::function<int64_t()>& func,
Stats& stats) {
VELOX_CHECK(underMemoryArbitration());
uint64_t execTimeUs{0};
int64_t reclaimedBytes{0};
{
Expand Down
9 changes: 6 additions & 3 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,14 @@ template <typename Item>
std::shared_ptr<AsyncSource<Item>> createAsyncMemoryReclaimTask(
std::function<std::unique_ptr<Item>()> task) {
auto* arbitrationCtx = memory::memoryArbitrationContext();
VELOX_CHECK_NOT_NULL(arbitrationCtx);
return std::make_shared<AsyncSource<Item>>(
[asyncTask = std::move(task), arbitrationCtx]() -> std::unique_ptr<Item> {
VELOX_CHECK_NOT_NULL(arbitrationCtx);
memory::ScopedMemoryArbitrationContext ctx(arbitrationCtx->requestor);
std::unique_ptr<ScopedMemoryArbitrationContext> restoreArbitrationCtx;
if (arbitrationCtx != nullptr) {
restoreArbitrationCtx =
std::make_unique<ScopedMemoryArbitrationContext>(
arbitrationCtx->requestor);
}
return asyncTask();
});
}
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ uint64_t SharedArbitrator::getCapacityGrowthTarget(
}

bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) {
// NOTE: we shouldn't trigger the recursive memory capacity growth under
// memory arbiration context.
VELOX_CHECK(!underMemoryArbitration());

ArbitrationOperation op(
pool, requestBytes, getCapacityGrowthTarget(*pool, requestBytes));
ScopedArbitration scopedArbitration(this, &op);
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/MemoryReclaimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ uint64_t ParallelMemoryReclaimer::reclaim(
if (candidate.reclaimableBytes == 0) {
continue;
}
reclaimTasks.push_back(std::make_shared<AsyncSource<ReclaimResult>>(
reclaimTasks.push_back(memory::createAsyncMemoryReclaimTask<ReclaimResult>(
[&, reclaimPool = candidate.pool]() {
try {
Stats reclaimStats;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/exec/Spiller.h"
#include <folly/ScopeGuard.h>
#include "velox/common/base/AsyncSource.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/HashJoinBridge.h"
Expand Down Expand Up @@ -476,7 +477,7 @@ void Spiller::runSpill(bool lastRun) {
if (spillRuns_[partition].rows.empty()) {
continue;
}
writes.push_back(std::make_shared<AsyncSource<SpillStatus>>(
writes.push_back(memory::createAsyncMemoryReclaimTask<SpillStatus>(
[partition, this]() { return writeSpill(partition); }));
if ((writes.size() > 1) && executor_ != nullptr) {
executor_->add([source = writes.back()]() { source->prepare(); });
Expand Down
37 changes: 24 additions & 13 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2125,22 +2125,28 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {

if (testData.expectedReclaimable) {
const auto usedMemory = op->pool()->usedBytes();
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
}
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
// The hash table itself in the grouping set is not cleared so it still
// uses some memory.
ASSERT_LT(op->pool()->usedBytes(), usedMemory);
} else {
VELOX_ASSERT_THROW(
op->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
reclaimerStats_),
"");
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
VELOX_ASSERT_THROW(
op->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
reclaimerStats_),
"");
}
}

Task::resume(task);
Expand Down Expand Up @@ -2249,10 +2255,13 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) {
ASSERT_GT(reclaimableBytes, 0);

const auto usedMemory = op->pool()->usedBytes();
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
}
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GE(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
Expand Down Expand Up @@ -2492,6 +2501,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) {
if (enableSpilling) {
ASSERT_GT(reclaimableBytes, 0);
const auto usedMemory = op->pool()->usedBytes();
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
Expand All @@ -2503,6 +2513,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) {
reclaimerStats_.reset();
} else {
ASSERT_EQ(reclaimableBytes, 0);
memory::ScopedMemoryArbitrationContext ctx(op->pool());
VELOX_ASSERT_THROW(
op->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
Expand Down
123 changes: 87 additions & 36 deletions velox/exec/tests/MemoryReclaimerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/exec/tests/utils/OperatorTestBase.h"
Expand Down Expand Up @@ -167,50 +168,64 @@ TEST(ReclaimableSectionGuard, basic) {
ASSERT_TRUE(nonReclaimableSection);
}

TEST_F(MemoryReclaimerTest, parallelMemoryReclaimer) {
class MockMemoryReclaimer : public memory::MemoryReclaimer {
public:
static std::unique_ptr<MemoryReclaimer> create(
bool reclaimable,
uint64_t memoryBytes) {
return std::unique_ptr<MemoryReclaimer>(
new MockMemoryReclaimer(reclaimable, memoryBytes));
}
namespace {
class MockMemoryReclaimer : public memory::MemoryReclaimer {
public:
static std::unique_ptr<MemoryReclaimer> create(
bool reclaimable,
uint64_t memoryBytes,
const std::function<void(memory::MemoryPool*)>& reclaimCallback =
nullptr) {
return std::unique_ptr<MemoryReclaimer>(
new MockMemoryReclaimer(reclaimable, memoryBytes, reclaimCallback));
}

bool reclaimableBytes(const MemoryPool& pool, uint64_t& reclaimableBytes)
const override {
reclaimableBytes = 0;
if (!reclaimable_) {
return false;
}
reclaimableBytes = memoryBytes_;
return true;
bool reclaimableBytes(const MemoryPool& pool, uint64_t& reclaimableBytes)
const override {
reclaimableBytes = 0;
if (!reclaimable_) {
return false;
}
reclaimableBytes = memoryBytes_;
return true;
}

uint64_t reclaim(
MemoryPool* pool,
uint64_t targetBytes,
uint64_t maxWaitMs,
Stats& stats) override {
VELOX_CHECK(reclaimable_);
const uint64_t reclaimedBytes = memoryBytes_;
memoryBytes_ = 0;
return reclaimedBytes;
uint64_t reclaim(
MemoryPool* pool,
uint64_t targetBytes,
uint64_t maxWaitMs,
Stats& stats) override {
VELOX_CHECK(underMemoryArbitration());
VELOX_CHECK(reclaimable_);
if (reclaimCallback_) {
reclaimCallback_(pool);
}
const uint64_t reclaimedBytes = memoryBytes_;
memoryBytes_ = 0;
return reclaimedBytes;
}

uint64_t memoryBytes() const {
return memoryBytes_;
}
uint64_t memoryBytes() const {
return memoryBytes_;
}

private:
MockMemoryReclaimer(bool reclaimable, uint64_t memoryBytes)
: reclaimable_(reclaimable), memoryBytes_(memoryBytes) {}
private:
MockMemoryReclaimer(
bool reclaimable,
uint64_t memoryBytes,
const std::function<void(memory::MemoryPool*)>& reclaimCallback)
: reclaimCallback_(reclaimCallback),
reclaimable_(reclaimable),
memoryBytes_(memoryBytes) {}

bool reclaimable_{false};
int reclaimCount_{0};
uint64_t memoryBytes_{0};
};
const std::function<void(memory::MemoryPool*)> reclaimCallback_;
bool reclaimable_{false};
int reclaimCount_{0};
uint64_t memoryBytes_{0};
};
} // namespace

TEST_F(MemoryReclaimerTest, parallelMemoryReclaimer) {
struct TestReclaimer {
bool reclaimable;
uint64_t memoryBytes;
Expand Down Expand Up @@ -261,3 +276,39 @@ TEST_F(MemoryReclaimerTest, parallelMemoryReclaimer) {
}
}
}

// This test is to verify if the parallel memory reclaimer can prevent recursive
// arbitration.
TEST_F(MemoryReclaimerTest, recursiveArbitrationWithParallelReclaim) {
std::atomic_bool reclaimExecuted{false};
auto rootPool = memory::memoryManager()->addRootPool(
"recursiveArbitrationWithParallelReclaim",
32 << 20,
exec::ParallelMemoryReclaimer::create(executor_.get()));
const auto reclaimCallback = [&](memory::MemoryPool* pool) {
void* buffer = pool->allocate(64 << 20);
pool->free(buffer, 64 << 20);
reclaimExecuted = true;
};
const int numLeafPools = 10;
const int bufferSize = 1 << 20;
std::vector<MockMemoryReclaimer*> memoryReclaimers;
std::vector<std::shared_ptr<MemoryPool>> leafPools;
std::vector<void*> buffers;
for (int i = 0; i < numLeafPools; ++i) {
auto reclaimer =
MockMemoryReclaimer::create(true, bufferSize, reclaimCallback);
leafPools.push_back(
rootPool->addLeafChild(std::to_string(i), true, std::move(reclaimer)));
buffers.push_back(leafPools.back()->allocate(bufferSize));
memoryReclaimers.push_back(
static_cast<MockMemoryReclaimer*>(leafPools.back()->reclaimer()));
}

memory::testingRunArbitration();

for (int i = 0; i < numLeafPools; ++i) {
leafPools[i]->free(buffers[i], bufferSize);
}
ASSERT_TRUE(reclaimExecuted);
}
22 changes: 14 additions & 8 deletions velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,13 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) {
}

if (testData.expectedReclaimable) {
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
}
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
reclaimerStats_.reset();
Expand Down Expand Up @@ -772,10 +775,13 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) {
ASSERT_TRUE(reclaimable);
ASSERT_GT(reclaimableBytes, 0);

op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
}
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
reclaimerStats_.reset();
Expand Down

0 comments on commit 0079d4a

Please sign in to comment.