Skip to content

Commit

Permalink
Simplify and optimize duplicate row memory allocations (facebookincub…
Browse files Browse the repository at this point in the history
…ator#10865)

Summary:
This PR simplifies the duplicate row memory allocations by using dedicated
hash string allocator with one per each partition. All the dedicated hash
string allocators are all backup by the memory pool of the top level hash
table. Then we can simplify the memory reservation in hash build operator
and avoid the unnecessary memory fragements. Now we have to reserve memory
from each of the hash build operator for the worst case of duplicate row
memory allocations.

This PR also make free next row vectors optional which can helps save >10%
cpu cycles in hash join list microbenchmark. The free next row vectors are
not necessary and most for sanity check. Also current implementation does
not guaratee the next row vectors free always get executed as the duplicate
row flag is not set correctly. This PR also fixes this by always passing the
associated row container when do parallel row insertion.

Pull Request resolved: facebookincubator#10865

Reviewed By: kevinwilfong

Differential Revision: D61881526

Pulled By: xiaoxmeng
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 11, 2024
1 parent 7e4d626 commit c78ea01
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 206 deletions.
13 changes: 6 additions & 7 deletions velox/common/memory/AllocationPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
#include "velox/common/memory/Memory.h"

namespace facebook::velox::memory {
// A set of Allocations holding the fixed width payload
// rows. The Runs are filled to the end except for the last one. This
// is used for iterating over the payload for rehashing, returning
// results etc. This is used via HashStringAllocator for variable length
// allocation for backing ByteStreams for complex objects. In that case, there
// is a current run that is appended to and when this is exhausted a new run is
// started.
/// A set of Allocations holding the fixed width payload ows. The Runs are
/// filled to the end except for the last one. This is used for iterating over
/// the payload for rehashing, returning results etc. This is used via
/// HashStringAllocator for variable length allocation for backing ByteStreams
/// for complex objects. In that case, there is a current run that is appended
/// to and when this is exhausted a new run is started.
class AllocationPool {
public:
static constexpr int32_t kMinPages = 16;
Expand Down
6 changes: 0 additions & 6 deletions velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,4 @@ int64_t HashStringAllocator::checkConsistency() const {
bool HashStringAllocator::isEmpty() const {
return state_.sizeFromPool() == 0 && checkConsistency() == 0;
}

void HashStringAllocator::checkEmpty() const {
VELOX_CHECK_EQ(0, state_.sizeFromPool());
VELOX_CHECK_EQ(0, checkConsistency());
}

} // namespace facebook::velox
6 changes: 0 additions & 6 deletions velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ class HashStringAllocator : public StreamArena {
/// checkConsistency() which makes it slow. Do not use in hot paths.
bool isEmpty() const;

/// Throws if 'this' is not empty. Checks consistency of
/// 'this'. This is a fast check for RowContainer users freeing the
/// variable length data they store. Can be used in non-debug
/// builds.
void checkEmpty() const;

std::string toString() const;

/// Effectively makes this immutable while executing f, any attempt to access
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ const uint64_t* SelectiveColumnReader::shouldMoveNulls(const RowSet& rows) {

void SelectiveColumnReader::setComplexNulls(
const RowSet& rows,
VectorPtr& result) const {
VectorPtr& result) const {
if (!nullsInReadRange_) {
if (result->isNullsWritable()) {
result->clearNulls(0, rows.size());
Expand Down
3 changes: 0 additions & 3 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1371,9 +1371,6 @@ void GroupingSet::toIntermediate(
if (intermediateRows_) {
intermediateRows_->eraseRows(folly::Range<char**>(
intermediateGroups_.data(), intermediateGroups_.size()));
if (intermediateRows_->checkFree()) {
intermediateRows_->stringAllocator().checkEmpty();
}
}

// It's unnecessary to call function->clear() to reset the internal states of
Expand Down
67 changes: 12 additions & 55 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,15 +734,8 @@ bool HashBuild::finishHashBuild() {

SCOPE_EXIT {
// Make a guard to release the unused memory reservation since we have
// finished the merged table build. The guard makes sure we release the
// memory reserved for other operators even when exceptions are thrown to
// prevent memory leak. We cannot rely on other operator's cleanup mechanism
// because when exceptions are thrown, other operator's cleanup mechanism
// might already have finished.
// finished the merged table build.
pool()->release();
for (auto* build : otherBuilds) {
build->pool()->release();
}
};

// TODO: Re-enable parallel join build with spilling triggered after
Expand Down Expand Up @@ -800,59 +793,19 @@ void HashBuild::ensureTableFits(

TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this);

const auto dupRowOverheadBytes = sizeof(char*) + sizeof(NextRowVector);

uint64_t totalNumRows{0};
uint64_t lastBuildBytesToReserve{0};
bool allowDuplicateRows{false};
{
std::lock_guard<std::mutex> l(mutex_);
const auto numRows = table_->rows()->numRows();
totalNumRows += numRows;
allowDuplicateRows = table_->rows()->nextOffset() != 0;
if (allowDuplicateRows) {
lastBuildBytesToReserve += numRows * dupRowOverheadBytes;
}
totalNumRows += table_->rows()->numRows();
}

for (auto i = 0; i < otherTables.size(); i++) {
for (auto i = 0; i < otherTables.size(); ++i) {
auto& otherTable = otherTables[i];
VELOX_CHECK_NOT_NULL(otherTable);
auto& otherBuild = otherBuilds[i];
const auto& rowContainer = otherTable->rows();
int64_t numRows{0};
{
std::lock_guard<std::mutex> l(otherBuild->mutex_);
numRows = rowContainer->numRows();
}
if (numRows == 0) {
continue;
}

totalNumRows += numRows;
if (!allowDuplicateRows) {
continue;
}

const auto dupRowBytesToReserve = numRows * dupRowOverheadBytes;
if (!isParallelJoin) {
lastBuildBytesToReserve += dupRowBytesToReserve;
continue;
}

Operator::ReclaimableSectionGuard guard(otherBuild);
auto* otherPool = otherBuild->pool();

// Reserve memory for memory allocations for next-row-vectors in
// otherBuild operators if it is parallel join build. Otherwise all
// next-row-vectors shall be allocated from the last build operator.
if (!otherPool->maybeReserve(dupRowBytesToReserve)) {
LOG(WARNING)
<< "Failed to reserve " << succinctBytes(dupRowBytesToReserve)
<< " for for duplicate row memory allocation from non-last memory pool "
<< otherPool->name()
<< ", usage: " << succinctBytes(otherPool->usedBytes())
<< ", reservation: " << succinctBytes(otherPool->reservedBytes());
totalNumRows += otherTable->rows()->numRows();
}
}

Expand All @@ -862,16 +815,20 @@ void HashBuild::ensureTableFits(

// NOTE: reserve a bit more memory to consider the extra memory used for
// parallel table build operation.
lastBuildBytesToReserve += table_->estimateHashTableSize(totalNumRows) * 1.1;
//
// TODO: make this query configurable.
const uint64_t memoryBytesToReserve =
table_->estimateHashTableSize(totalNumRows) * 1.1;
{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(lastBuildBytesToReserve)) {
if (pool()->maybeReserve(memoryBytesToReserve)) {
return;
}
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(lastBuildBytesToReserve)
<< " for last build memory pool " << pool()->name()
LOG(WARNING) << "Failed to reserve " << succinctBytes(memoryBytesToReserve)
<< " for join table build from last hash build operator "
<< pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
}
Expand Down
79 changes: 54 additions & 25 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,12 +735,6 @@ void HashTable<ignoreNullKeys>::allocateTables(

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::clear(bool freeTable) {
if (otherTables_.size() > 0) {
rows_->clearNextRowVectors();
for (auto i = 0; i < otherTables_.size(); ++i) {
otherTables_[i]->rows()->clearNextRowVectors();
}
}
for (auto* rowContainer : allRows()) {
rowContainer->clear();
}
Expand Down Expand Up @@ -954,6 +948,12 @@ void HashTable<ignoreNullKeys>::parallelJoinBuild() {
}

// The parallel table building step.
VELOX_CHECK(joinInsertAllocators_.empty());
joinInsertAllocators_.reserve(otherTables_.size());
for (int i = 0; i < otherTables_.size(); ++i) {
joinInsertAllocators_.push_back(
std::make_unique<HashStringAllocator>(rows_->pool()));
}
std::vector<std::vector<char*>> overflowPerPartition(numPartitions);
for (auto i = 0; i < numPartitions; ++i) {
buildSteps.push_back(std::make_shared<AsyncSource<bool>>(
Expand Down Expand Up @@ -984,7 +984,8 @@ void HashTable<ignoreNullKeys>::parallelJoinBuild() {
overflows.data(),
hashes.data(),
overflows.size(),
nullptr);
nullptr,
&rows_->stringAllocator());

VELOX_CHECK_EQ(table->rows()->numRows(), table->numParallelBuildRows_);
}
Expand Down Expand Up @@ -1052,16 +1053,24 @@ void HashTable<ignoreNullKeys>::buildJoinPartition(
buildPartitionBounds_[partition],
buildPartitionBounds_[partition + 1],
overflow};
auto* rowContainer =
(partition == 0 ? this : otherTables_[partition - 1].get())->rows();
const int partitionNum = partition;
auto* allocator = (partitionNum == 0)
? &rows_->stringAllocator()
: joinInsertAllocators_[partitionNum - 1].get();
VELOX_CHECK_NOT_NULL(allocator);
for (auto i = 0; i < numPartitions; ++i) {
auto* table = i == 0 ? this : otherTables_[i - 1].get();
RowContainerIterator iter;
while (const auto numRows = table->rows_->listPartitionRows(
iter, partition, kBatch, *rowPartitions[i], rows.data())) {
hashRows(folly::Range(rows.data(), numRows), false, hashes);
insertForJoin(
rowContainer, rows.data(), hashes.data(), numRows, &partitionInfo);
table->rows_.get(),
rows.data(),
hashes.data(),
numRows,
&partitionInfo,
allocator);
table->numParallelBuildRows_ += numRows;
}
}
Expand All @@ -1077,7 +1086,13 @@ bool HashTable<ignoreNullKeys>::insertBatch(
return false;
}
if (isJoinBuild_) {
insertForJoin(rows(), groups, hashes.data(), numGroups);
insertForJoin(
rows(),
groups,
hashes.data(),
numGroups,
nullptr,
&rows_->stringAllocator());
} else {
insertForGroupBy(groups, hashes.data(), numGroups);
}
Expand Down Expand Up @@ -1137,12 +1152,16 @@ void HashTable<ignoreNullKeys>::insertForGroupBy(
}

template <bool ignoreNullKeys>
bool HashTable<ignoreNullKeys>::arrayPushRow(char* row, int32_t index) {
bool HashTable<ignoreNullKeys>::arrayPushRow(
RowContainer* rows,
char* row,
int32_t index,
HashStringAllocator* allocator) {
auto* existingRow = table_[index];
if (existingRow != nullptr) {
if (nextOffset_ > 0) {
hasDuplicates_ = true;
rows_->appendNextRow(existingRow, row);
rows->appendNextRow(existingRow, row, allocator);
}
return false;
}
Expand All @@ -1154,10 +1173,11 @@ template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::pushNext(
RowContainer* rows,
char* row,
char* next) {
char* next,
HashStringAllocator* allocator) {
VELOX_CHECK_GT(nextOffset_, 0);
hasDuplicates_ = true;
rows->appendNextRow(row, next);
rows->appendNextRow(row, next, allocator);
}

template <bool ignoreNullKeys>
Expand All @@ -1168,7 +1188,8 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe(
uint64_t hash,
char* inserted,
bool extraCheck,
TableInsertPartitionInfo* partitionInfo) {
TableInsertPartitionInfo* partitionInfo,
HashStringAllocator* allocator) {
constexpr int32_t kKeyOffset =
-static_cast<int32_t>(sizeof(normalized_key_t));
auto insertFn = [&](int32_t /*row*/, PartitionBoundIndexType index) {
Expand All @@ -1187,7 +1208,7 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe(
if (RowContainer::normalizedKey(group) ==
RowContainer::normalizedKey(inserted)) {
if (nextOffset_ > 0) {
pushNext(rows, group, inserted);
pushNext(rows, group, inserted, allocator);
}
return true;
}
Expand All @@ -1204,7 +1225,7 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe(
[&](char* group, int32_t /*row*/) {
if (compareKeys(group, inserted)) {
if (nextOffset_ > 0) {
pushNext(rows, group, inserted);
pushNext(rows, group, inserted, allocator);
}
return true;
}
Expand All @@ -1224,7 +1245,8 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::insertForJoinWithPrefetch(
char** groups,
uint64_t* hashes,
int32_t numGroups,
TableInsertPartitionInfo* partitionInfo) {
TableInsertPartitionInfo* partitionInfo,
HashStringAllocator* allocator) {
auto i = 0;
ProbeState states[kPrefetchSize];
constexpr int32_t kKeyOffset =
Expand All @@ -1244,14 +1266,20 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::insertForJoinWithPrefetch(
for (int32_t j = 0; j < kPrefetchSize; ++j) {
auto index = i + j;
buildFullProbe<isNormailizedKeyMode>(
rows, states[j], hashes[index], groups[index], j != 0, partitionInfo);
rows,
states[j],
hashes[index],
groups[index],
j != 0,
partitionInfo,
allocator);
}
}
for (; i < numGroups; ++i) {
states[0].preProbe(*this, hashes[i], i);
states[0].firstProbe(*this, keyOffset);
buildFullProbe<isNormailizedKeyMode>(
rows, states[0], hashes[i], groups[i], false, partitionInfo);
rows, states[0], hashes[i], groups[i], false, partitionInfo, allocator);
}
}

Expand All @@ -1261,23 +1289,24 @@ void HashTable<ignoreNullKeys>::insertForJoin(
char** groups,
uint64_t* hashes,
int32_t numGroups,
TableInsertPartitionInfo* partitionInfo) {
TableInsertPartitionInfo* partitionInfo,
HashStringAllocator* allocator) {
// The insertable rows are in the table, all get put in the hash table or
// array.
if (hashMode_ == HashMode::kArray) {
for (auto i = 0; i < numGroups; ++i) {
auto index = hashes[i];
VELOX_CHECK_LT(index, capacity_);
arrayPushRow(groups[i], index);
arrayPushRow(rows, groups[i], index, allocator);
}
return;
}
if (hashMode_ == HashMode::kNormalizedKey) {
insertForJoinWithPrefetch<true>(
rows, groups, hashes, numGroups, partitionInfo);
rows, groups, hashes, numGroups, partitionInfo, allocator);
} else {
insertForJoinWithPrefetch<false>(
rows, groups, hashes, numGroups, partitionInfo);
rows, groups, hashes, numGroups, partitionInfo, allocator);
}
}

Expand Down
Loading

0 comments on commit c78ea01

Please sign in to comment.