From c78ea01026656a6476fbbede892eee5cce56dd97 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Tue, 10 Sep 2024 18:12:49 -0700 Subject: [PATCH] Simplify and optimize duplicate row memory allocations (#10865) Summary: This PR simplifies the duplicate row memory allocations by using dedicated hash string allocator with one per each partition. All the dedicated hash string allocators are all backup by the memory pool of the top level hash table. Then we can simplify the memory reservation in hash build operator and avoid the unnecessary memory fragements. Now we have to reserve memory from each of the hash build operator for the worst case of duplicate row memory allocations. This PR also make free next row vectors optional which can helps save >10% cpu cycles in hash join list microbenchmark. The free next row vectors are not necessary and most for sanity check. Also current implementation does not guaratee the next row vectors free always get executed as the duplicate row flag is not set correctly. This PR also fixes this by always passing the associated row container when do parallel row insertion. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10865 Reviewed By: kevinwilfong Differential Revision: D61881526 Pulled By: xiaoxmeng --- velox/common/memory/AllocationPool.h | 13 ++- velox/common/memory/HashStringAllocator.cpp | 6 -- velox/common/memory/HashStringAllocator.h | 6 -- velox/dwio/common/SelectiveColumnReader.cpp | 2 +- velox/exec/GroupingSet.cpp | 3 - velox/exec/HashBuild.cpp | 67 +++------------- velox/exec/HashTable.cpp | 79 +++++++++++++------ velox/exec/HashTable.h | 54 +++++++------ velox/exec/Operator.cpp | 6 -- velox/exec/RowContainer.cpp | 36 +++------ velox/exec/RowContainer.h | 27 +++---- .../HashJoinListResultBenchmark.cpp | 55 ++++++++----- velox/exec/tests/RowContainerTest.cpp | 23 +++--- 13 files changed, 171 insertions(+), 206 deletions(-) diff --git a/velox/common/memory/AllocationPool.h b/velox/common/memory/AllocationPool.h index ff3fee4442f0..1449be6b9af9 100644 --- a/velox/common/memory/AllocationPool.h +++ b/velox/common/memory/AllocationPool.h @@ -18,13 +18,12 @@ #include "velox/common/memory/Memory.h" namespace facebook::velox::memory { -// A set of Allocations holding the fixed width payload -// rows. The Runs are filled to the end except for the last one. This -// is used for iterating over the payload for rehashing, returning -// results etc. This is used via HashStringAllocator for variable length -// allocation for backing ByteStreams for complex objects. In that case, there -// is a current run that is appended to and when this is exhausted a new run is -// started. +/// A set of Allocations holding the fixed width payload ows. The Runs are +/// filled to the end except for the last one. This is used for iterating over +/// the payload for rehashing, returning results etc. This is used via +/// HashStringAllocator for variable length allocation for backing ByteStreams +/// for complex objects. In that case, there is a current run that is appended +/// to and when this is exhausted a new run is started. class AllocationPool { public: static constexpr int32_t kMinPages = 16; diff --git a/velox/common/memory/HashStringAllocator.cpp b/velox/common/memory/HashStringAllocator.cpp index 1a938a0dd571..8cb6781a16e3 100644 --- a/velox/common/memory/HashStringAllocator.cpp +++ b/velox/common/memory/HashStringAllocator.cpp @@ -793,10 +793,4 @@ int64_t HashStringAllocator::checkConsistency() const { bool HashStringAllocator::isEmpty() const { return state_.sizeFromPool() == 0 && checkConsistency() == 0; } - -void HashStringAllocator::checkEmpty() const { - VELOX_CHECK_EQ(0, state_.sizeFromPool()); - VELOX_CHECK_EQ(0, checkConsistency()); -} - } // namespace facebook::velox diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index 4604b7565c4f..485da69966d7 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -341,12 +341,6 @@ class HashStringAllocator : public StreamArena { /// checkConsistency() which makes it slow. Do not use in hot paths. bool isEmpty() const; - /// Throws if 'this' is not empty. Checks consistency of - /// 'this'. This is a fast check for RowContainer users freeing the - /// variable length data they store. Can be used in non-debug - /// builds. - void checkEmpty() const; - std::string toString() const; /// Effectively makes this immutable while executing f, any attempt to access diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index c7a3ad67a4e2..c2285e69a207 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -158,7 +158,7 @@ const uint64_t* SelectiveColumnReader::shouldMoveNulls(const RowSet& rows) { void SelectiveColumnReader::setComplexNulls( const RowSet& rows, - VectorPtr& result) const { + VectorPtr& result) const { if (!nullsInReadRange_) { if (result->isNullsWritable()) { result->clearNulls(0, rows.size()); diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 02be45e15748..5e478bef8094 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -1371,9 +1371,6 @@ void GroupingSet::toIntermediate( if (intermediateRows_) { intermediateRows_->eraseRows(folly::Range( intermediateGroups_.data(), intermediateGroups_.size())); - if (intermediateRows_->checkFree()) { - intermediateRows_->stringAllocator().checkEmpty(); - } } // It's unnecessary to call function->clear() to reset the internal states of diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index ea14000e2886..6a4f6e3ab1d8 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -734,15 +734,8 @@ bool HashBuild::finishHashBuild() { SCOPE_EXIT { // Make a guard to release the unused memory reservation since we have - // finished the merged table build. The guard makes sure we release the - // memory reserved for other operators even when exceptions are thrown to - // prevent memory leak. We cannot rely on other operator's cleanup mechanism - // because when exceptions are thrown, other operator's cleanup mechanism - // might already have finished. + // finished the merged table build. pool()->release(); - for (auto* build : otherBuilds) { - build->pool()->release(); - } }; // TODO: Re-enable parallel join build with spilling triggered after @@ -800,59 +793,19 @@ void HashBuild::ensureTableFits( TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this); - const auto dupRowOverheadBytes = sizeof(char*) + sizeof(NextRowVector); - uint64_t totalNumRows{0}; - uint64_t lastBuildBytesToReserve{0}; - bool allowDuplicateRows{false}; { std::lock_guard l(mutex_); - const auto numRows = table_->rows()->numRows(); - totalNumRows += numRows; - allowDuplicateRows = table_->rows()->nextOffset() != 0; - if (allowDuplicateRows) { - lastBuildBytesToReserve += numRows * dupRowOverheadBytes; - } + totalNumRows += table_->rows()->numRows(); } - for (auto i = 0; i < otherTables.size(); i++) { + for (auto i = 0; i < otherTables.size(); ++i) { auto& otherTable = otherTables[i]; VELOX_CHECK_NOT_NULL(otherTable); auto& otherBuild = otherBuilds[i]; - const auto& rowContainer = otherTable->rows(); - int64_t numRows{0}; { std::lock_guard l(otherBuild->mutex_); - numRows = rowContainer->numRows(); - } - if (numRows == 0) { - continue; - } - - totalNumRows += numRows; - if (!allowDuplicateRows) { - continue; - } - - const auto dupRowBytesToReserve = numRows * dupRowOverheadBytes; - if (!isParallelJoin) { - lastBuildBytesToReserve += dupRowBytesToReserve; - continue; - } - - Operator::ReclaimableSectionGuard guard(otherBuild); - auto* otherPool = otherBuild->pool(); - - // Reserve memory for memory allocations for next-row-vectors in - // otherBuild operators if it is parallel join build. Otherwise all - // next-row-vectors shall be allocated from the last build operator. - if (!otherPool->maybeReserve(dupRowBytesToReserve)) { - LOG(WARNING) - << "Failed to reserve " << succinctBytes(dupRowBytesToReserve) - << " for for duplicate row memory allocation from non-last memory pool " - << otherPool->name() - << ", usage: " << succinctBytes(otherPool->usedBytes()) - << ", reservation: " << succinctBytes(otherPool->reservedBytes()); + totalNumRows += otherTable->rows()->numRows(); } } @@ -862,16 +815,20 @@ void HashBuild::ensureTableFits( // NOTE: reserve a bit more memory to consider the extra memory used for // parallel table build operation. - lastBuildBytesToReserve += table_->estimateHashTableSize(totalNumRows) * 1.1; + // + // TODO: make this query configurable. + const uint64_t memoryBytesToReserve = + table_->estimateHashTableSize(totalNumRows) * 1.1; { Operator::ReclaimableSectionGuard guard(this); - if (pool()->maybeReserve(lastBuildBytesToReserve)) { + if (pool()->maybeReserve(memoryBytesToReserve)) { return; } } - LOG(WARNING) << "Failed to reserve " << succinctBytes(lastBuildBytesToReserve) - << " for last build memory pool " << pool()->name() + LOG(WARNING) << "Failed to reserve " << succinctBytes(memoryBytesToReserve) + << " for join table build from last hash build operator " + << pool()->name() << ", usage: " << succinctBytes(pool()->usedBytes()) << ", reservation: " << succinctBytes(pool()->reservedBytes()); } diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 3a15da01614e..78aa468fc419 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -735,12 +735,6 @@ void HashTable::allocateTables( template void HashTable::clear(bool freeTable) { - if (otherTables_.size() > 0) { - rows_->clearNextRowVectors(); - for (auto i = 0; i < otherTables_.size(); ++i) { - otherTables_[i]->rows()->clearNextRowVectors(); - } - } for (auto* rowContainer : allRows()) { rowContainer->clear(); } @@ -954,6 +948,12 @@ void HashTable::parallelJoinBuild() { } // The parallel table building step. + VELOX_CHECK(joinInsertAllocators_.empty()); + joinInsertAllocators_.reserve(otherTables_.size()); + for (int i = 0; i < otherTables_.size(); ++i) { + joinInsertAllocators_.push_back( + std::make_unique(rows_->pool())); + } std::vector> overflowPerPartition(numPartitions); for (auto i = 0; i < numPartitions; ++i) { buildSteps.push_back(std::make_shared>( @@ -984,7 +984,8 @@ void HashTable::parallelJoinBuild() { overflows.data(), hashes.data(), overflows.size(), - nullptr); + nullptr, + &rows_->stringAllocator()); VELOX_CHECK_EQ(table->rows()->numRows(), table->numParallelBuildRows_); } @@ -1052,8 +1053,11 @@ void HashTable::buildJoinPartition( buildPartitionBounds_[partition], buildPartitionBounds_[partition + 1], overflow}; - auto* rowContainer = - (partition == 0 ? this : otherTables_[partition - 1].get())->rows(); + const int partitionNum = partition; + auto* allocator = (partitionNum == 0) + ? &rows_->stringAllocator() + : joinInsertAllocators_[partitionNum - 1].get(); + VELOX_CHECK_NOT_NULL(allocator); for (auto i = 0; i < numPartitions; ++i) { auto* table = i == 0 ? this : otherTables_[i - 1].get(); RowContainerIterator iter; @@ -1061,7 +1065,12 @@ void HashTable::buildJoinPartition( iter, partition, kBatch, *rowPartitions[i], rows.data())) { hashRows(folly::Range(rows.data(), numRows), false, hashes); insertForJoin( - rowContainer, rows.data(), hashes.data(), numRows, &partitionInfo); + table->rows_.get(), + rows.data(), + hashes.data(), + numRows, + &partitionInfo, + allocator); table->numParallelBuildRows_ += numRows; } } @@ -1077,7 +1086,13 @@ bool HashTable::insertBatch( return false; } if (isJoinBuild_) { - insertForJoin(rows(), groups, hashes.data(), numGroups); + insertForJoin( + rows(), + groups, + hashes.data(), + numGroups, + nullptr, + &rows_->stringAllocator()); } else { insertForGroupBy(groups, hashes.data(), numGroups); } @@ -1137,12 +1152,16 @@ void HashTable::insertForGroupBy( } template -bool HashTable::arrayPushRow(char* row, int32_t index) { +bool HashTable::arrayPushRow( + RowContainer* rows, + char* row, + int32_t index, + HashStringAllocator* allocator) { auto* existingRow = table_[index]; if (existingRow != nullptr) { if (nextOffset_ > 0) { hasDuplicates_ = true; - rows_->appendNextRow(existingRow, row); + rows->appendNextRow(existingRow, row, allocator); } return false; } @@ -1154,10 +1173,11 @@ template void HashTable::pushNext( RowContainer* rows, char* row, - char* next) { + char* next, + HashStringAllocator* allocator) { VELOX_CHECK_GT(nextOffset_, 0); hasDuplicates_ = true; - rows->appendNextRow(row, next); + rows->appendNextRow(row, next, allocator); } template @@ -1168,7 +1188,8 @@ FOLLY_ALWAYS_INLINE void HashTable::buildFullProbe( uint64_t hash, char* inserted, bool extraCheck, - TableInsertPartitionInfo* partitionInfo) { + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator) { constexpr int32_t kKeyOffset = -static_cast(sizeof(normalized_key_t)); auto insertFn = [&](int32_t /*row*/, PartitionBoundIndexType index) { @@ -1187,7 +1208,7 @@ FOLLY_ALWAYS_INLINE void HashTable::buildFullProbe( if (RowContainer::normalizedKey(group) == RowContainer::normalizedKey(inserted)) { if (nextOffset_ > 0) { - pushNext(rows, group, inserted); + pushNext(rows, group, inserted, allocator); } return true; } @@ -1204,7 +1225,7 @@ FOLLY_ALWAYS_INLINE void HashTable::buildFullProbe( [&](char* group, int32_t /*row*/) { if (compareKeys(group, inserted)) { if (nextOffset_ > 0) { - pushNext(rows, group, inserted); + pushNext(rows, group, inserted, allocator); } return true; } @@ -1224,7 +1245,8 @@ FOLLY_ALWAYS_INLINE void HashTable::insertForJoinWithPrefetch( char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* partitionInfo) { + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator) { auto i = 0; ProbeState states[kPrefetchSize]; constexpr int32_t kKeyOffset = @@ -1244,14 +1266,20 @@ FOLLY_ALWAYS_INLINE void HashTable::insertForJoinWithPrefetch( for (int32_t j = 0; j < kPrefetchSize; ++j) { auto index = i + j; buildFullProbe( - rows, states[j], hashes[index], groups[index], j != 0, partitionInfo); + rows, + states[j], + hashes[index], + groups[index], + j != 0, + partitionInfo, + allocator); } } for (; i < numGroups; ++i) { states[0].preProbe(*this, hashes[i], i); states[0].firstProbe(*this, keyOffset); buildFullProbe( - rows, states[0], hashes[i], groups[i], false, partitionInfo); + rows, states[0], hashes[i], groups[i], false, partitionInfo, allocator); } } @@ -1261,23 +1289,24 @@ void HashTable::insertForJoin( char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* partitionInfo) { + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator) { // The insertable rows are in the table, all get put in the hash table or // array. if (hashMode_ == HashMode::kArray) { for (auto i = 0; i < numGroups; ++i) { auto index = hashes[i]; VELOX_CHECK_LT(index, capacity_); - arrayPushRow(groups[i], index); + arrayPushRow(rows, groups[i], index, allocator); } return; } if (hashMode_ == HashMode::kNormalizedKey) { insertForJoinWithPrefetch( - rows, groups, hashes, numGroups, partitionInfo); + rows, groups, hashes, numGroups, partitionInfo, allocator); } else { insertForJoinWithPrefetch( - rows, groups, hashes, numGroups, partitionInfo); + rows, groups, hashes, numGroups, partitionInfo, allocator); } } diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 4726960a2b6f..549733178d90 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -460,14 +460,7 @@ class HashTable : public BaseHashTable { memory::MemoryPool* pool, const std::shared_ptr& stringArena = nullptr); - ~HashTable() override { - if (otherTables_.size() > 0) { - rows_->clearNextRowVectors(); - for (auto i = 0; i < otherTables_.size(); ++i) { - otherTables_[i]->rows()->clearNextRowVectors(); - } - } - } + ~HashTable() override = default; static std::unique_ptr createForAggregation( std::vector>&& hashers, @@ -818,13 +811,15 @@ class HashTable : public BaseHashTable { // partition info for parallel join table build. It specifies the first and // (exclusive) last indexes of the insert entries in the table. If a row // can't be inserted within this range, it is not inserted but rather added - // to the end of 'overflows' in 'partitionInfo'. + // to the end of 'overflows' in 'partitionInfo'. 'allocator' is provided for + // duplicate row vector allocations. void insertForJoin( RowContainer* rows, char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* = nullptr); + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator); // Inserts 'numGroups' entries into 'this'. 'groups' point to // contents in a RowContainer owned by 'this'. 'hashes' are the hash @@ -903,16 +898,25 @@ class HashTable : public BaseHashTable { const std::vector& columns, const char* row) const; - // Adds a row to a hash join table in kArray hash mode. Returns true - // if a new entry was made and false if the row was added to an - // existing set of rows with the same key. - bool arrayPushRow(char* row, int32_t index); - - // Adds a row to a hash join build side entry with multiple rows - // with the same key. - // 'rows' should be the same as the one in hash table except for - // 'parallelJoinBuild'. - void pushNext(RowContainer* rows, char* row, char* next); + // Adds a row to a hash join table in kArray hash mode. Returns true if a new + // entry was made and false if the row was added to an existing set of rows + // with the same key. 'allocator' is provided for duplicate row vector + // allocations. + bool arrayPushRow( + RowContainer* rows, + char* row, + int32_t index, + HashStringAllocator* allocator); + + // Adds a row to a hash join build side entry with multiple rows with the same + // key. 'rows' should be the same as the one in hash table except for + // 'parallelJoinBuild'. 'allocator' is provided for duplicate row vector + // allocations. + void pushNext( + RowContainer* rows, + char* row, + char* next, + HashStringAllocator* allocator); // Finishes inserting an entry into a join hash table. If 'partitionInfo' is // not null and the insert falls out-side of the partition range, then insert @@ -924,7 +928,8 @@ class HashTable : public BaseHashTable { uint64_t hash, char* row, bool extraCheck, - TableInsertPartitionInfo* partitionInfo); + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator); template void insertForJoinWithPrefetch( @@ -932,7 +937,8 @@ class HashTable : public BaseHashTable { char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* partitionInfo); + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator); // Updates 'hashers_' to correspond to the keys in the // content. Returns true if all hashers offer a mapping to value ids @@ -1046,6 +1052,10 @@ class HashTable : public BaseHashTable { // Owns the memory of multiple build side hash join tables that are // combined into a single probe hash table. std::vector>> otherTables_; + // The allocators used for duplicate row vector allocations under parallel + // join insert with one per each parallel join partition. These allocators + // all allocate memory from the memory pool of the top level memory pool. + std::vector> joinInsertAllocators_; // Statistics maintained if kTrackLoads is set. // Flags indicate whether the same column in all build-side join hash tables diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 67afed0eafa3..f4ed6b1ece3d 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -657,12 +657,6 @@ uint64_t Operator::MemoryReclaimer::reclaim( memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes); op_->reclaim(targetBytes, stats); } - // NOTE: the parallel hash build is running at the background thread - // pool which won't stop during memory reclamation so the operator's - // memory usage might increase in such case. memory usage. - if (op_->operatorType() == "HashBuild") { - reclaimedBytes = std::max(0, reclaimedBytes); - } VELOX_CHECK_GE( reclaimedBytes, 0, diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 9f1282a138e1..b83050982b49 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -144,12 +144,12 @@ RowContainer::RowContainer( : keyTypes_(keyTypes), nullableKeys_(nullableKeys), isJoinBuild_(isJoinBuild), - accumulators_(accumulators), hasNormalizedKeys_(hasNormalizedKeys), - rows_(pool), stringAllocator_( stringAllocator ? stringAllocator - : std::make_shared(pool)) { + : std::make_shared(pool)), + accumulators_(accumulators), + rows_(pool) { // Compute the layout of the payload row. The row has keys, null flags, // accumulators, dependent fields. All fields are fixed width. If variable // width data is referenced, this is done with StringView(for VARCHAR) and @@ -392,13 +392,15 @@ int32_t RowContainer::findRows(folly::Range rows, char** result) { return numRows; } -void RowContainer::appendNextRow(char* current, char* nextRow) { +void RowContainer::appendNextRow( + char* current, + char* nextRow, + HashStringAllocator* allocator) { VELOX_CHECK(getNextRowVector(nextRow) == nullptr); NextRowVector*& nextRowArrayPtr = getNextRowVector(current); if (nextRowArrayPtr == nullptr) { - nextRowArrayPtr = - new (stringAllocator_->allocate(kNextRowVectorSize)->begin()) - NextRowVector(StlAllocator(stringAllocator_.get())); + nextRowArrayPtr = new (allocator->allocate(kNextRowVectorSize)->begin()) + NextRowVector(StlAllocator(allocator)); hasDuplicateRows_ = true; nextRowArrayPtr->emplace_back(current); } @@ -935,8 +937,7 @@ void RowContainer::hash( void RowContainer::clear() { const bool sharedStringAllocator = !stringAllocator_.unique(); - if (checkFree_ || sharedStringAllocator || usesExternalMemory_ || - hasDuplicateRows_) { + if (sharedStringAllocator || usesExternalMemory_) { constexpr int32_t kBatch = 1000; std::vector rows(kBatch); RowContainerIterator iter; @@ -944,11 +945,10 @@ void RowContainer::clear() { freeRowsExtraMemory(folly::Range(rows.data(), numRows), true); } } + hasDuplicateRows_ = false; + rows_.clear(); if (!sharedStringAllocator) { - if (checkFree_) { - stringAllocator_->checkEmpty(); - } stringAllocator_->clear(); } numRows_ = 0; @@ -958,18 +958,6 @@ void RowContainer::clear() { firstFreeRow_ = nullptr; } -void RowContainer::clearNextRowVectors() { - if (hasDuplicateRows_) { - constexpr int32_t kBatch = 1000; - std::vector rows(kBatch); - RowContainerIterator iter; - while (auto numRows = listRows(&iter, kBatch, rows.data())) { - freeNextRowVectors(folly::Range(rows.data(), numRows), true); - } - hasDuplicateRows_ = false; - } -} - void RowContainer::setProbedFlag(char** rows, int32_t numRows) { for (auto i = 0; i < numRows; i++) { // Row may be null in case of a FULL join. diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 8c7f7b56805e..5fe625636e4c 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -686,8 +686,10 @@ class RowContainer { /// Creates a next-row-vector if it doesn't exist. Appends the row address to /// the next-row-vector, and store the address of the next-row-vector in the - /// 'nextOffset_' slot for all duplicate rows. - void appendNextRow(char* current, char* nextRow); + /// 'nextOffset_' slot for all duplicate rows. 'allocator' is provided for the + /// duplicate row vector allocations. + void + appendNextRow(char* current, char* nextRow, HashStringAllocator* allocator); NextRowVector*& getNextRowVector(char* row) const { return *reinterpret_cast(row + nextOffset_); @@ -725,9 +727,6 @@ class RowContainer { /// Resets the state to be as after construction. Frees memory for payload. void clear(); - /// Frees memory for next row vectors. - void clearNextRowVectors(); - int32_t compareRows( const char* left, const char* right, @@ -808,10 +807,6 @@ class RowContainer { return mutable_; } - bool checkFree() const { - return checkFree_; - } - /// Returns a summary of the container: key types, dependent types, number of /// accumulators and number of rows. std::string toString() const; @@ -1281,7 +1276,6 @@ class RowContainer { CompareFlags flags = CompareFlags()); // Free variable-width fields at column `column_index` associated with the - // 'rows', and if 'checkFree_' is true, zero out complex-typed field in // 'rows'. `FieldType` is the type of data representation of the fields in // row, and can be one of StringView(represents VARCHAR) and // std::string_view(represents ARRAY, MAP or ROW). @@ -1310,9 +1304,6 @@ class RowContainer { } } stringAllocator_->free(HashStringAllocator::headerOf(view.data())); - if (checkFree_) { - view = FieldType(); - } } } @@ -1334,11 +1325,12 @@ class RowContainer { columnHasNulls_[columnIndex] = columnHasNulls_[columnIndex] || hasNulls; } - const bool checkFree_ = false; - const std::vector keyTypes_; const bool nullableKeys_; const bool isJoinBuild_; + // True if normalized keys are enabled in initial state. + const bool hasNormalizedKeys_; + const std::shared_ptr stringAllocator_; std::vector columnHasNulls_; @@ -1355,6 +1347,8 @@ class RowContainer { std::vector types_; std::vector typeKinds_; int32_t nextOffset_ = 0; + // Indicates if this row container has rows with duplicate keys. This only + // applies if 'nextOffset_' is set. bool hasDuplicateRows_{false}; // Bit position of null bit in the row. 0 if no null flag. Order is keys, // accumulators, dependent. @@ -1375,8 +1369,6 @@ class RowContainer { int32_t fixedRowSize_; // How many bytes do the flags (null, probed, free) occupy. int32_t flagBytes_; - // True if normalized keys are enabled in initial state. - const bool hasNormalizedKeys_; // The count of entries that have an extra normalized_key_t before the // start. int64_t numRowsWithNormalizedKey_ = 0; @@ -1395,7 +1387,6 @@ class RowContainer { uint64_t numFreeRows_ = 0; memory::AllocationPool rows_; - std::shared_ptr stringAllocator_; int alignment_ = 1; }; diff --git a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp index aba2a254068b..6945900aee89 100644 --- a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp +++ b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp @@ -166,15 +166,18 @@ struct HashTableBenchmarkResult { double eraseClock{0}; + uint64_t peakMemoryBytes{0}; + // The mode of the table. BaseHashTable::HashMode hashMode; - void merge(HashTableBenchmarkResult other) { - numIter++; + void merge(const HashTableBenchmarkResult& other) { + ++numIter; buildClocks += other.buildClocks; listJoinResultClocks += other.listJoinResultClocks; totalClock += other.totalClock; eraseClock += other.eraseClock; + peakMemoryBytes += other.peakMemoryBytes; } std::string toString() const { @@ -186,7 +189,8 @@ struct HashTableBenchmarkResult { << " listJoinResultClocks=" << listJoinResultClocks << "(" << (listJoinResultClocks / totalClock * 100) << "%) buildClocks=" << buildClocks << "(" - << (buildClocks / totalClock * 100) << "%)"; + << (buildClocks / totalClock * 100) << "%)" + << " peakMemoryBytes=" << succinctBytes(peakMemoryBytes); if (params.runErase) { out << " eraseClock=" << eraseClock << "(" << (eraseClock / totalClock * 100) << "%)"; @@ -201,13 +205,21 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { : randomEngine_((std::random_device{}())) {} HashTableBenchmarkResult run(HashTableBenchmarkParams params) { + std::shared_ptr tableAggregatePool = + rootPool_->addAggregateChild("tableAggregate"); + std::vector> tablePools; + tablePools.reserve(params.numTables); + for (int i = 0; i < params_.numTables; ++i) { + tablePools.push_back( + tableAggregatePool->addLeafChild(fmt::format("table{}", i))); + } params_ = params; HashTableBenchmarkResult result; result.params = params_; - SelectivityInfo totalClock; + uint64_t totalClocks{0}; { - SelectivityTimer timer(totalClock, 0); - buildTable(); + ClockTimer timer(totalClocks); + buildTable(tablePools); result.numOutput = probeTableAndListResult(); result.hashMode = topTable_->hashMode(); VELOX_CHECK_EQ(result.hashMode, params_.mode); @@ -219,8 +231,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { } result.buildClocks += buildTime_; result.listJoinResultClocks += listJoinResultTime_; - result.totalClock += totalClock.timeToDropValue(); - + result.totalClock = totalClocks; + result.peakMemoryBytes = tableAggregatePool->peakBytes(); return result; } @@ -348,7 +360,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { } // Prepare join table. - void buildTable() { + void buildTable( + const std::vector>& tablePools) { std::vector dependentTypes; std::vector> otherTables; std::vector batches; @@ -363,7 +376,7 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { true, false, 1'000, - pool_.get()); + tablePools[i].get()); copyVectorsToTable(batches[i], table.get()); if (i == 0) { @@ -372,15 +385,15 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { otherTables.push_back(std::move(table)); } } - SelectivityInfo buildClocks; + uint64_t buildClocks{0}; { - SelectivityTimer timer(buildClocks, 0); + ClockTimer timer(buildClocks); topTable_->prepareJoinTable( std::move(otherTables), BaseHashTable::kNoSpillInputStartPartitionBit, executor_.get()); } - buildTime_ = buildClocks.timeToDropValue(); + buildTime_ = buildClocks; } void probeTable( @@ -428,9 +441,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { // Hash probe and list join result. int64_t probeTableAndListResult() { auto lookup = std::make_unique(topTable_->hashers()); - auto numBatch = params_.probeSize / params_.hashTableSize; - auto batchSize = params_.hashTableSize; - SelectivityInfo listJoinResultClocks; + const auto numBatch = params_.probeSize / params_.hashTableSize; + const auto batchSize = params_.hashTableSize; BufferPtr outputRowMapping; auto outputBatchSize = batchSize; std::vector outputTableRows; @@ -444,8 +456,9 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { auto mapping = initializeRowNumberMapping( outputRowMapping, outputBatchSize, pool_.get()); outputTableRows.resize(outputBatchSize); + uint64_t listJoinResultClocks{0}; { - SelectivityTimer timer(listJoinResultClocks, 0); + ClockTimer timer(listJoinResultClocks); while (!resultsIter.atEnd()) { numJoinListResult += topTable_->listJoinResults( resultsIter, @@ -455,8 +468,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { std::numeric_limits::max()); } } + listJoinResultTime_ += listJoinResultClocks; } - listJoinResultTime_ = listJoinResultClocks.timeToDropValue(); return numJoinListResult; } @@ -464,7 +477,6 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { auto lookup = std::make_unique(topTable_->hashers()); auto batchSize = 10000; auto mode = topTable_->hashMode(); - SelectivityInfo eraseClock; BufferPtr outputRowMapping; auto outputBatchSize = topTable_->rows()->numRows() + 2; std::vector outputTableRows; @@ -482,12 +494,13 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { mapping, folly::Range(outputTableRows.data(), outputTableRows.size()), std::numeric_limits::max()); + uint64_t eraseClocks{0}; { - SelectivityTimer timer(eraseClock, 0); + ClockTimer timer(eraseClocks); topTable_->rows()->eraseRows( folly::Range(outputTableRows.data(), num)); } - eraseTime_ += eraseClock.timeToDropValue(); + eraseTime_ = eraseClocks; } std::default_random_engine randomEngine_; diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index 26a666fe2d81..129efad3411e 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -1951,15 +1951,15 @@ TEST_F(RowContainerTest, nextRowVector) { }; auto validateNextRowVector = [&]() { - for (int i = 0; i < rows.size(); i++) { + for (int i = 0; i < rows.size(); ++i) { auto vector = data->getNextRowVector(rows[i]); if (vector) { auto iter = std::find(vector->begin(), vector->end(), rows[i]); - EXPECT_NE(iter, vector->end()); - EXPECT_TRUE(vector->size() <= 2 && vector->size() > 0); + ASSERT_NE(iter, vector->end()); + ASSERT_TRUE(vector->size() <= 2 && vector->size() > 0); for (auto next : *vector) { - EXPECT_EQ(data->getNextRowVector(next), vector); - EXPECT_TRUE(std::find(rows.begin(), rows.end(), next) != rows.end()); + ASSERT_EQ(data->getNextRowVector(next), vector); + ASSERT_TRUE(std::find(rows.begin(), rows.end(), next) != rows.end()); } } } @@ -1969,18 +1969,18 @@ TEST_F(RowContainerTest, nextRowVector) { for (int i = 0; i < numRows; ++i) { rows.push_back(data->newRow()); rowSet.insert(rows.back()); - EXPECT_EQ(data->getNextRowVector(rows.back()), nullptr); + ASSERT_EQ(data->getNextRowVector(rows.back()), nullptr); } - EXPECT_EQ(numRows, data->numRows()); + ASSERT_EQ(numRows, data->numRows()); std::vector rowsFromContainer(numRows); RowContainerIterator iter; - EXPECT_EQ( + ASSERT_EQ( data->listRows(&iter, numRows, rowsFromContainer.data()), numRows); - EXPECT_EQ(0, data->listRows(&iter, numRows, rows.data())); - EXPECT_EQ(rows, rowsFromContainer); + ASSERT_EQ(0, data->listRows(&iter, numRows, rows.data())); + ASSERT_EQ(rows, rowsFromContainer); for (int i = 0; i + 2 <= numRows; i += 2) { - data->appendNextRow(rows[i], rows[i + 1]); + data->appendNextRow(rows[i], rows[i + 1], &data->stringAllocator()); } validateNextRowVector(); }; @@ -2008,7 +2008,6 @@ TEST_F(RowContainerTest, nextRowVector) { std::vector eraseRows(numRows); std::iota(eraseRows.begin(), eraseRows.end(), 0); nextRowVectorEraseValidation(eraseRows); - VELOX_ASSERT_THROW( nextRowVectorEraseValidation({1}), "All rows with the same keys must be present in 'rows'");