Skip to content

Commit

Permalink
Avoid additional memory allocation during sort output spill
Browse files Browse the repository at this point in the history
Summary:
Memory arbitration fuzzer is flaky because of the unexpected memory allocation after sort output spill finish.
The reason is that when we finish spill, we setup the merge reader to prepare reading the unspilled data. 
The merge reader might use non-trivial amount of data which cause additional memory consumption. And 
we shall keep the spill critical path as fast as possible. This PR moves the merge reader setup from the spill
path to the first get output with unit tests. Also improve the fuzzer test logging a bit to help debug.

Differential Revision: D64072660
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 8, 2024
1 parent 683e359 commit 5302a8c
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 43 deletions.
23 changes: 19 additions & 4 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ SortBuffer::~SortBuffer() {
}

void SortBuffer::addInput(const VectorPtr& input) {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::SortBuffer::addInput", this);

VELOX_CHECK(!noMoreInput_);
ensureInputFits(input);

Expand Down Expand Up @@ -348,6 +351,7 @@ void SortBuffer::prepareOutput(vector_size_t maxOutputRows) {
if (spiller_ != nullptr) {
spillSources_.resize(maxOutputRows);
spillSourceRows_.resize(maxOutputRows);
prepareOutputWithSpill();
}

VELOX_CHECK_GT(output_->size(), 0);
Expand Down Expand Up @@ -414,11 +418,22 @@ void SortBuffer::getOutputWithSpill() {

void SortBuffer::finishSpill() {
VELOX_CHECK_NULL(spillMerger_);
SpillPartitionSet spillPartitionSet;
spiller_->finishSpill(spillPartitionSet);
VELOX_CHECK_EQ(spillPartitionSet.size(), 1);
spillMerger_ = spillPartitionSet.begin()->second->createOrderedReader(
VELOX_CHECK(spillPartitionSet_.empty());
spiller_->finishSpill(spillPartitionSet_);
VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
}

void SortBuffer::prepareOutputWithSpill() {
VELOX_CHECK_NOT_NULL(spiller_);
if (spillMerger_ != nullptr) {
VELOX_CHECK(spillPartitionSet_.empty());
return;
}

VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
spillMerger_ = spillPartitionSet_.begin()->second->createOrderedReader(
spillConfig_->readBufferSize, pool(), spillStats_);
spillPartitionSet_.clear();
}

} // namespace facebook::velox::exec
4 changes: 4 additions & 0 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class SortBuffer {
void updateEstimatedOutputRowSize();
// Invoked to initialize or reset the reusable output buffer to get output.
void prepareOutput(vector_size_t maxOutputRows);
// Invoked to initialize reader to read the spilled data from storage for
// output processing.
void prepareOutputWithSpill();
void getOutputWithoutSpill();
void getOutputWithSpill();
// Spill during input stage.
Expand Down Expand Up @@ -116,6 +119,7 @@ class SortBuffer {
// sort key columns are stored first then the non-sorted data columns.
RowTypePtr spillerStoreType_;
std::unique_ptr<Spiller> spiller_;
SpillPartitionSet spillPartitionSet_;
// Used to merge the sorted runs from in-memory rows and spilled rows on disk.
std::unique_ptr<TreeOfLosers<SpillMergeStream>> spillMerger_;
// Records the source rows to copy to 'output_' in order.
Expand Down
8 changes: 3 additions & 5 deletions velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,13 @@ class MemoryArbitrationFuzzer {

struct Stats {
size_t successCount{0};
size_t failureCount{0};
size_t oomCount{0};
size_t abortCount{0};

void print() const {
std::stringstream ss;
ss << "Success count = " << successCount
<< ", failure count = " << failureCount
<< ". OOM count = " << oomCount << " Abort count = " << abortCount;
ss << "Success count = " << successCount << ". OOM count = " << oomCount
<< " Abort count = " << abortCount;
LOG(INFO) << ss.str();
}
};
Expand Down Expand Up @@ -714,7 +712,7 @@ void MemoryArbitrationFuzzer::verify() {
} else if (e.errorCode() == error_code::kMemAborted.c_str()) {
++lockedStats->abortCount;
} else {
++lockedStats->failureCount;
LOG(ERROR) << "Unexpected exception: " << e.what();
std::rethrow_exception(std::current_exception());
}
}
Expand Down
171 changes: 137 additions & 34 deletions velox/exec/tests/SortBufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,24 +456,9 @@ TEST_F(SortBufferTest, spill) {
}
}

DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) {
DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringInput) {
auto spillDirectory = exec::test::TempDirectoryPath::create();
auto spillConfig = common::SpillConfig(
[&]() -> const std::string& { return spillDirectory->getPath(); },
[&](uint64_t) {},
"0.0.0",
1000,
0,
1 << 20,
executor_.get(),
100,
100000,
0,
0,
0,
0,
0,
"none");
const auto spillConfig = getSpillConfig(spillDirectory->getPath());
folly::Synchronized<common::SpillStats> spillStats;
auto sortBuffer = std::make_unique<SortBuffer>(
inputType_,
Expand All @@ -485,34 +470,152 @@ DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) {
&spillConfig,
&spillStats);

const int numInputs = 10;
const int numSpilledInputs = 10 / 2;
std::atomic_int processedInputs{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::SortBuffer::addInput",
std::function<void(SortBuffer*)>(([&](SortBuffer* sortBuffer) {
if (processedInputs++ != numSpilledInputs) {
return;
}
ASSERT_GT(sortBuffer->pool()->usedBytes(), 0);
sortBuffer->spill();
ASSERT_EQ(sortBuffer->pool()->usedBytes(), 0);
})));

const std::shared_ptr<memory::MemoryPool> fuzzerPool =
memory::memoryManager()->addLeafPool("spillSource");
memory::memoryManager()->addLeafPool("spillDuringInput");
VectorFuzzer fuzzer({.vectorSize = 1024}, fuzzerPool.get());
uint64_t totalNumInput{0};

ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0);
const auto peakSpillMemoryUsage =
memory::spillMemoryPool()->stats().peakBytes;

TestScopedSpillInjection scopedSpillInjection(0);
for (int i = 0; i < 3; ++i) {
for (int i = 0; i < numInputs; ++i) {
sortBuffer->addInput(fuzzer.fuzzRow(inputType_));
}
sortBuffer->noMoreInput();

ASSERT_FALSE(spillStats.rlock()->empty());
ASSERT_GT(spillStats.rlock()->spilledRows, 0);
ASSERT_EQ(spillStats.rlock()->spilledRows, numInputs * 1024);
ASSERT_GT(spillStats.rlock()->spilledBytes, 0);
ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1);
ASSERT_EQ(spillStats.rlock()->spilledFiles, 2);

ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0);
if (memory::spillMemoryPool()->trackUsage()) {
ASSERT_GT(memory::spillMemoryPool()->stats().peakBytes, 0);
ASSERT_GE(
memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage);
}
}

std::atomic_bool noMoreInput{false};
DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringOutput) {
auto spillDirectory = exec::test::TempDirectoryPath::create();
const auto spillConfig = getSpillConfig(spillDirectory->getPath());
folly::Synchronized<common::SpillStats> spillStats;
auto sortBuffer = std::make_unique<SortBuffer>(
inputType_,
sortColumnIndices_,
sortCompareFlags_,
pool_.get(),
&nonReclaimableSection_,
prefixSortConfig_,
&spillConfig,
&spillStats);

const int numInputs = 10;
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::SortBuffer::noMoreInput",
std::function<void(SortBuffer*)>(
([&](SortBuffer* sortBuffer) { noMoreInput.store(true); })));
std::function<void(SortBuffer*)>(([&](SortBuffer* sortBuffer) {
ASSERT_GT(sortBuffer->pool()->usedBytes(), 0);
sortBuffer->spill();
ASSERT_EQ(sortBuffer->pool()->usedBytes(), 0);
})));

std::atomic_int numInputs{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::common::memory::MemoryPoolImpl::maybeReserve",
std::function<void(memory::MemoryPoolImpl*)>(
([&](memory::MemoryPoolImpl* pool) {
if (noMoreInput) {
++numInputs;
}
})));
const std::shared_ptr<memory::MemoryPool> fuzzerPool =
memory::memoryManager()->addLeafPool("spillDuringOutput");
VectorFuzzer fuzzer({.vectorSize = 1024}, fuzzerPool.get());
uint64_t totalNumInput{0};

ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0);
const auto peakSpillMemoryUsage =
memory::spillMemoryPool()->stats().peakBytes;

for (int i = 0; i < numInputs; ++i) {
sortBuffer->addInput(fuzzer.fuzzRow(inputType_));
}
sortBuffer->noMoreInput();
sortBuffer->getOutput(10000);
ASSERT_EQ(numInputs, 1);

ASSERT_FALSE(spillStats.rlock()->empty());
ASSERT_GT(spillStats.rlock()->spilledRows, 0);
ASSERT_EQ(spillStats.rlock()->spilledRows, numInputs * 1024);
ASSERT_GT(spillStats.rlock()->spilledBytes, 0);
ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1);
ASSERT_EQ(spillStats.rlock()->spilledFiles, 1);

ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0);
if (memory::spillMemoryPool()->trackUsage()) {
ASSERT_GT(memory::spillMemoryPool()->stats().peakBytes, 0);
ASSERT_GE(
memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage);
}
}

DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) {
for (bool spillEnabled : {false, true}) {
SCOPED_TRACE(fmt::format("spillEnabled {}", spillEnabled));

auto spillDirectory = exec::test::TempDirectoryPath::create();
const auto spillConfig = getSpillConfig(spillDirectory->getPath());
folly::Synchronized<common::SpillStats> spillStats;
auto sortBuffer = std::make_unique<SortBuffer>(
inputType_,
sortColumnIndices_,
sortCompareFlags_,
pool_.get(),
&nonReclaimableSection_,
prefixSortConfig_,
spillEnabled ? &spillConfig : nullptr,
&spillStats);

const std::shared_ptr<memory::MemoryPool> fuzzerPool =
memory::memoryManager()->addLeafPool("reserveMemoryGetOutput");
VectorFuzzer fuzzer({.vectorSize = 1024}, fuzzerPool.get());

const int numInputs{10};
for (int i = 0; i < numInputs; ++i) {
sortBuffer->addInput(fuzzer.fuzzRow(inputType_));
}

std::atomic_bool noMoreInput{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::SortBuffer::noMoreInput",
std::function<void(SortBuffer*)>(
([&](SortBuffer* sortBuffer) { noMoreInput.store(true); })));

std::atomic_int numReserves{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::common::memory::MemoryPoolImpl::maybeReserve",
std::function<void(memory::MemoryPoolImpl*)>(
([&](memory::MemoryPoolImpl* pool) {
if (noMoreInput) {
++numReserves;
}
})));

sortBuffer->noMoreInput();
// Sets an extreme large value to get output once to avoid test flakiness.
sortBuffer->getOutput(1'000'000);
if (spillEnabled) {
ASSERT_EQ(numReserves, 1);
} else {
ASSERT_EQ(numReserves, 0);
}
}
}

TEST_F(SortBufferTest, emptySpill) {
Expand Down

0 comments on commit 5302a8c

Please sign in to comment.