diff --git a/velox/common/memory/AllocationPool.h b/velox/common/memory/AllocationPool.h index ff3fee4442f0..1449be6b9af9 100644 --- a/velox/common/memory/AllocationPool.h +++ b/velox/common/memory/AllocationPool.h @@ -18,13 +18,12 @@ #include "velox/common/memory/Memory.h" namespace facebook::velox::memory { -// A set of Allocations holding the fixed width payload -// rows. The Runs are filled to the end except for the last one. This -// is used for iterating over the payload for rehashing, returning -// results etc. This is used via HashStringAllocator for variable length -// allocation for backing ByteStreams for complex objects. In that case, there -// is a current run that is appended to and when this is exhausted a new run is -// started. +/// A set of Allocations holding the fixed width payload ows. The Runs are +/// filled to the end except for the last one. This is used for iterating over +/// the payload for rehashing, returning results etc. This is used via +/// HashStringAllocator for variable length allocation for backing ByteStreams +/// for complex objects. In that case, there is a current run that is appended +/// to and when this is exhausted a new run is started. class AllocationPool { public: static constexpr int32_t kMinPages = 16; diff --git a/velox/common/memory/HashStringAllocator.cpp b/velox/common/memory/HashStringAllocator.cpp index 1a938a0dd571..8cb6781a16e3 100644 --- a/velox/common/memory/HashStringAllocator.cpp +++ b/velox/common/memory/HashStringAllocator.cpp @@ -793,10 +793,4 @@ int64_t HashStringAllocator::checkConsistency() const { bool HashStringAllocator::isEmpty() const { return state_.sizeFromPool() == 0 && checkConsistency() == 0; } - -void HashStringAllocator::checkEmpty() const { - VELOX_CHECK_EQ(0, state_.sizeFromPool()); - VELOX_CHECK_EQ(0, checkConsistency()); -} - } // namespace facebook::velox diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index 4604b7565c4f..485da69966d7 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -341,12 +341,6 @@ class HashStringAllocator : public StreamArena { /// checkConsistency() which makes it slow. Do not use in hot paths. bool isEmpty() const; - /// Throws if 'this' is not empty. Checks consistency of - /// 'this'. This is a fast check for RowContainer users freeing the - /// variable length data they store. Can be used in non-debug - /// builds. - void checkEmpty() const; - std::string toString() const; /// Effectively makes this immutable while executing f, any attempt to access diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 02be45e15748..5e478bef8094 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -1371,9 +1371,6 @@ void GroupingSet::toIntermediate( if (intermediateRows_) { intermediateRows_->eraseRows(folly::Range( intermediateGroups_.data(), intermediateGroups_.size())); - if (intermediateRows_->checkFree()) { - intermediateRows_->stringAllocator().checkEmpty(); - } } // It's unnecessary to call function->clear() to reset the internal states of diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index ea14000e2886..6a4f6e3ab1d8 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -734,15 +734,8 @@ bool HashBuild::finishHashBuild() { SCOPE_EXIT { // Make a guard to release the unused memory reservation since we have - // finished the merged table build. The guard makes sure we release the - // memory reserved for other operators even when exceptions are thrown to - // prevent memory leak. We cannot rely on other operator's cleanup mechanism - // because when exceptions are thrown, other operator's cleanup mechanism - // might already have finished. + // finished the merged table build. pool()->release(); - for (auto* build : otherBuilds) { - build->pool()->release(); - } }; // TODO: Re-enable parallel join build with spilling triggered after @@ -800,59 +793,19 @@ void HashBuild::ensureTableFits( TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this); - const auto dupRowOverheadBytes = sizeof(char*) + sizeof(NextRowVector); - uint64_t totalNumRows{0}; - uint64_t lastBuildBytesToReserve{0}; - bool allowDuplicateRows{false}; { std::lock_guard l(mutex_); - const auto numRows = table_->rows()->numRows(); - totalNumRows += numRows; - allowDuplicateRows = table_->rows()->nextOffset() != 0; - if (allowDuplicateRows) { - lastBuildBytesToReserve += numRows * dupRowOverheadBytes; - } + totalNumRows += table_->rows()->numRows(); } - for (auto i = 0; i < otherTables.size(); i++) { + for (auto i = 0; i < otherTables.size(); ++i) { auto& otherTable = otherTables[i]; VELOX_CHECK_NOT_NULL(otherTable); auto& otherBuild = otherBuilds[i]; - const auto& rowContainer = otherTable->rows(); - int64_t numRows{0}; { std::lock_guard l(otherBuild->mutex_); - numRows = rowContainer->numRows(); - } - if (numRows == 0) { - continue; - } - - totalNumRows += numRows; - if (!allowDuplicateRows) { - continue; - } - - const auto dupRowBytesToReserve = numRows * dupRowOverheadBytes; - if (!isParallelJoin) { - lastBuildBytesToReserve += dupRowBytesToReserve; - continue; - } - - Operator::ReclaimableSectionGuard guard(otherBuild); - auto* otherPool = otherBuild->pool(); - - // Reserve memory for memory allocations for next-row-vectors in - // otherBuild operators if it is parallel join build. Otherwise all - // next-row-vectors shall be allocated from the last build operator. - if (!otherPool->maybeReserve(dupRowBytesToReserve)) { - LOG(WARNING) - << "Failed to reserve " << succinctBytes(dupRowBytesToReserve) - << " for for duplicate row memory allocation from non-last memory pool " - << otherPool->name() - << ", usage: " << succinctBytes(otherPool->usedBytes()) - << ", reservation: " << succinctBytes(otherPool->reservedBytes()); + totalNumRows += otherTable->rows()->numRows(); } } @@ -862,16 +815,20 @@ void HashBuild::ensureTableFits( // NOTE: reserve a bit more memory to consider the extra memory used for // parallel table build operation. - lastBuildBytesToReserve += table_->estimateHashTableSize(totalNumRows) * 1.1; + // + // TODO: make this query configurable. + const uint64_t memoryBytesToReserve = + table_->estimateHashTableSize(totalNumRows) * 1.1; { Operator::ReclaimableSectionGuard guard(this); - if (pool()->maybeReserve(lastBuildBytesToReserve)) { + if (pool()->maybeReserve(memoryBytesToReserve)) { return; } } - LOG(WARNING) << "Failed to reserve " << succinctBytes(lastBuildBytesToReserve) - << " for last build memory pool " << pool()->name() + LOG(WARNING) << "Failed to reserve " << succinctBytes(memoryBytesToReserve) + << " for join table build from last hash build operator " + << pool()->name() << ", usage: " << succinctBytes(pool()->usedBytes()) << ", reservation: " << succinctBytes(pool()->reservedBytes()); } diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 3a15da01614e..78aa468fc419 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -735,12 +735,6 @@ void HashTable::allocateTables( template void HashTable::clear(bool freeTable) { - if (otherTables_.size() > 0) { - rows_->clearNextRowVectors(); - for (auto i = 0; i < otherTables_.size(); ++i) { - otherTables_[i]->rows()->clearNextRowVectors(); - } - } for (auto* rowContainer : allRows()) { rowContainer->clear(); } @@ -954,6 +948,12 @@ void HashTable::parallelJoinBuild() { } // The parallel table building step. + VELOX_CHECK(joinInsertAllocators_.empty()); + joinInsertAllocators_.reserve(otherTables_.size()); + for (int i = 0; i < otherTables_.size(); ++i) { + joinInsertAllocators_.push_back( + std::make_unique(rows_->pool())); + } std::vector> overflowPerPartition(numPartitions); for (auto i = 0; i < numPartitions; ++i) { buildSteps.push_back(std::make_shared>( @@ -984,7 +984,8 @@ void HashTable::parallelJoinBuild() { overflows.data(), hashes.data(), overflows.size(), - nullptr); + nullptr, + &rows_->stringAllocator()); VELOX_CHECK_EQ(table->rows()->numRows(), table->numParallelBuildRows_); } @@ -1052,8 +1053,11 @@ void HashTable::buildJoinPartition( buildPartitionBounds_[partition], buildPartitionBounds_[partition + 1], overflow}; - auto* rowContainer = - (partition == 0 ? this : otherTables_[partition - 1].get())->rows(); + const int partitionNum = partition; + auto* allocator = (partitionNum == 0) + ? &rows_->stringAllocator() + : joinInsertAllocators_[partitionNum - 1].get(); + VELOX_CHECK_NOT_NULL(allocator); for (auto i = 0; i < numPartitions; ++i) { auto* table = i == 0 ? this : otherTables_[i - 1].get(); RowContainerIterator iter; @@ -1061,7 +1065,12 @@ void HashTable::buildJoinPartition( iter, partition, kBatch, *rowPartitions[i], rows.data())) { hashRows(folly::Range(rows.data(), numRows), false, hashes); insertForJoin( - rowContainer, rows.data(), hashes.data(), numRows, &partitionInfo); + table->rows_.get(), + rows.data(), + hashes.data(), + numRows, + &partitionInfo, + allocator); table->numParallelBuildRows_ += numRows; } } @@ -1077,7 +1086,13 @@ bool HashTable::insertBatch( return false; } if (isJoinBuild_) { - insertForJoin(rows(), groups, hashes.data(), numGroups); + insertForJoin( + rows(), + groups, + hashes.data(), + numGroups, + nullptr, + &rows_->stringAllocator()); } else { insertForGroupBy(groups, hashes.data(), numGroups); } @@ -1137,12 +1152,16 @@ void HashTable::insertForGroupBy( } template -bool HashTable::arrayPushRow(char* row, int32_t index) { +bool HashTable::arrayPushRow( + RowContainer* rows, + char* row, + int32_t index, + HashStringAllocator* allocator) { auto* existingRow = table_[index]; if (existingRow != nullptr) { if (nextOffset_ > 0) { hasDuplicates_ = true; - rows_->appendNextRow(existingRow, row); + rows->appendNextRow(existingRow, row, allocator); } return false; } @@ -1154,10 +1173,11 @@ template void HashTable::pushNext( RowContainer* rows, char* row, - char* next) { + char* next, + HashStringAllocator* allocator) { VELOX_CHECK_GT(nextOffset_, 0); hasDuplicates_ = true; - rows->appendNextRow(row, next); + rows->appendNextRow(row, next, allocator); } template @@ -1168,7 +1188,8 @@ FOLLY_ALWAYS_INLINE void HashTable::buildFullProbe( uint64_t hash, char* inserted, bool extraCheck, - TableInsertPartitionInfo* partitionInfo) { + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator) { constexpr int32_t kKeyOffset = -static_cast(sizeof(normalized_key_t)); auto insertFn = [&](int32_t /*row*/, PartitionBoundIndexType index) { @@ -1187,7 +1208,7 @@ FOLLY_ALWAYS_INLINE void HashTable::buildFullProbe( if (RowContainer::normalizedKey(group) == RowContainer::normalizedKey(inserted)) { if (nextOffset_ > 0) { - pushNext(rows, group, inserted); + pushNext(rows, group, inserted, allocator); } return true; } @@ -1204,7 +1225,7 @@ FOLLY_ALWAYS_INLINE void HashTable::buildFullProbe( [&](char* group, int32_t /*row*/) { if (compareKeys(group, inserted)) { if (nextOffset_ > 0) { - pushNext(rows, group, inserted); + pushNext(rows, group, inserted, allocator); } return true; } @@ -1224,7 +1245,8 @@ FOLLY_ALWAYS_INLINE void HashTable::insertForJoinWithPrefetch( char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* partitionInfo) { + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator) { auto i = 0; ProbeState states[kPrefetchSize]; constexpr int32_t kKeyOffset = @@ -1244,14 +1266,20 @@ FOLLY_ALWAYS_INLINE void HashTable::insertForJoinWithPrefetch( for (int32_t j = 0; j < kPrefetchSize; ++j) { auto index = i + j; buildFullProbe( - rows, states[j], hashes[index], groups[index], j != 0, partitionInfo); + rows, + states[j], + hashes[index], + groups[index], + j != 0, + partitionInfo, + allocator); } } for (; i < numGroups; ++i) { states[0].preProbe(*this, hashes[i], i); states[0].firstProbe(*this, keyOffset); buildFullProbe( - rows, states[0], hashes[i], groups[i], false, partitionInfo); + rows, states[0], hashes[i], groups[i], false, partitionInfo, allocator); } } @@ -1261,23 +1289,24 @@ void HashTable::insertForJoin( char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* partitionInfo) { + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator) { // The insertable rows are in the table, all get put in the hash table or // array. if (hashMode_ == HashMode::kArray) { for (auto i = 0; i < numGroups; ++i) { auto index = hashes[i]; VELOX_CHECK_LT(index, capacity_); - arrayPushRow(groups[i], index); + arrayPushRow(rows, groups[i], index, allocator); } return; } if (hashMode_ == HashMode::kNormalizedKey) { insertForJoinWithPrefetch( - rows, groups, hashes, numGroups, partitionInfo); + rows, groups, hashes, numGroups, partitionInfo, allocator); } else { insertForJoinWithPrefetch( - rows, groups, hashes, numGroups, partitionInfo); + rows, groups, hashes, numGroups, partitionInfo, allocator); } } diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 4726960a2b6f..549733178d90 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -460,14 +460,7 @@ class HashTable : public BaseHashTable { memory::MemoryPool* pool, const std::shared_ptr& stringArena = nullptr); - ~HashTable() override { - if (otherTables_.size() > 0) { - rows_->clearNextRowVectors(); - for (auto i = 0; i < otherTables_.size(); ++i) { - otherTables_[i]->rows()->clearNextRowVectors(); - } - } - } + ~HashTable() override = default; static std::unique_ptr createForAggregation( std::vector>&& hashers, @@ -818,13 +811,15 @@ class HashTable : public BaseHashTable { // partition info for parallel join table build. It specifies the first and // (exclusive) last indexes of the insert entries in the table. If a row // can't be inserted within this range, it is not inserted but rather added - // to the end of 'overflows' in 'partitionInfo'. + // to the end of 'overflows' in 'partitionInfo'. 'allocator' is provided for + // duplicate row vector allocations. void insertForJoin( RowContainer* rows, char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* = nullptr); + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator); // Inserts 'numGroups' entries into 'this'. 'groups' point to // contents in a RowContainer owned by 'this'. 'hashes' are the hash @@ -903,16 +898,25 @@ class HashTable : public BaseHashTable { const std::vector& columns, const char* row) const; - // Adds a row to a hash join table in kArray hash mode. Returns true - // if a new entry was made and false if the row was added to an - // existing set of rows with the same key. - bool arrayPushRow(char* row, int32_t index); - - // Adds a row to a hash join build side entry with multiple rows - // with the same key. - // 'rows' should be the same as the one in hash table except for - // 'parallelJoinBuild'. - void pushNext(RowContainer* rows, char* row, char* next); + // Adds a row to a hash join table in kArray hash mode. Returns true if a new + // entry was made and false if the row was added to an existing set of rows + // with the same key. 'allocator' is provided for duplicate row vector + // allocations. + bool arrayPushRow( + RowContainer* rows, + char* row, + int32_t index, + HashStringAllocator* allocator); + + // Adds a row to a hash join build side entry with multiple rows with the same + // key. 'rows' should be the same as the one in hash table except for + // 'parallelJoinBuild'. 'allocator' is provided for duplicate row vector + // allocations. + void pushNext( + RowContainer* rows, + char* row, + char* next, + HashStringAllocator* allocator); // Finishes inserting an entry into a join hash table. If 'partitionInfo' is // not null and the insert falls out-side of the partition range, then insert @@ -924,7 +928,8 @@ class HashTable : public BaseHashTable { uint64_t hash, char* row, bool extraCheck, - TableInsertPartitionInfo* partitionInfo); + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator); template void insertForJoinWithPrefetch( @@ -932,7 +937,8 @@ class HashTable : public BaseHashTable { char** groups, uint64_t* hashes, int32_t numGroups, - TableInsertPartitionInfo* partitionInfo); + TableInsertPartitionInfo* partitionInfo, + HashStringAllocator* allocator); // Updates 'hashers_' to correspond to the keys in the // content. Returns true if all hashers offer a mapping to value ids @@ -1046,6 +1052,10 @@ class HashTable : public BaseHashTable { // Owns the memory of multiple build side hash join tables that are // combined into a single probe hash table. std::vector>> otherTables_; + // The allocators used for duplicate row vector allocations under parallel + // join insert with one per each parallel join partition. These allocators + // all allocate memory from the memory pool of the top level memory pool. + std::vector> joinInsertAllocators_; // Statistics maintained if kTrackLoads is set. // Flags indicate whether the same column in all build-side join hash tables diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 67afed0eafa3..f4ed6b1ece3d 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -657,12 +657,6 @@ uint64_t Operator::MemoryReclaimer::reclaim( memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes); op_->reclaim(targetBytes, stats); } - // NOTE: the parallel hash build is running at the background thread - // pool which won't stop during memory reclamation so the operator's - // memory usage might increase in such case. memory usage. - if (op_->operatorType() == "HashBuild") { - reclaimedBytes = std::max(0, reclaimedBytes); - } VELOX_CHECK_GE( reclaimedBytes, 0, diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 9f1282a138e1..b83050982b49 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -144,12 +144,12 @@ RowContainer::RowContainer( : keyTypes_(keyTypes), nullableKeys_(nullableKeys), isJoinBuild_(isJoinBuild), - accumulators_(accumulators), hasNormalizedKeys_(hasNormalizedKeys), - rows_(pool), stringAllocator_( stringAllocator ? stringAllocator - : std::make_shared(pool)) { + : std::make_shared(pool)), + accumulators_(accumulators), + rows_(pool) { // Compute the layout of the payload row. The row has keys, null flags, // accumulators, dependent fields. All fields are fixed width. If variable // width data is referenced, this is done with StringView(for VARCHAR) and @@ -392,13 +392,15 @@ int32_t RowContainer::findRows(folly::Range rows, char** result) { return numRows; } -void RowContainer::appendNextRow(char* current, char* nextRow) { +void RowContainer::appendNextRow( + char* current, + char* nextRow, + HashStringAllocator* allocator) { VELOX_CHECK(getNextRowVector(nextRow) == nullptr); NextRowVector*& nextRowArrayPtr = getNextRowVector(current); if (nextRowArrayPtr == nullptr) { - nextRowArrayPtr = - new (stringAllocator_->allocate(kNextRowVectorSize)->begin()) - NextRowVector(StlAllocator(stringAllocator_.get())); + nextRowArrayPtr = new (allocator->allocate(kNextRowVectorSize)->begin()) + NextRowVector(StlAllocator(allocator)); hasDuplicateRows_ = true; nextRowArrayPtr->emplace_back(current); } @@ -935,8 +937,7 @@ void RowContainer::hash( void RowContainer::clear() { const bool sharedStringAllocator = !stringAllocator_.unique(); - if (checkFree_ || sharedStringAllocator || usesExternalMemory_ || - hasDuplicateRows_) { + if (sharedStringAllocator || usesExternalMemory_) { constexpr int32_t kBatch = 1000; std::vector rows(kBatch); RowContainerIterator iter; @@ -944,11 +945,10 @@ void RowContainer::clear() { freeRowsExtraMemory(folly::Range(rows.data(), numRows), true); } } + hasDuplicateRows_ = false; + rows_.clear(); if (!sharedStringAllocator) { - if (checkFree_) { - stringAllocator_->checkEmpty(); - } stringAllocator_->clear(); } numRows_ = 0; @@ -958,18 +958,6 @@ void RowContainer::clear() { firstFreeRow_ = nullptr; } -void RowContainer::clearNextRowVectors() { - if (hasDuplicateRows_) { - constexpr int32_t kBatch = 1000; - std::vector rows(kBatch); - RowContainerIterator iter; - while (auto numRows = listRows(&iter, kBatch, rows.data())) { - freeNextRowVectors(folly::Range(rows.data(), numRows), true); - } - hasDuplicateRows_ = false; - } -} - void RowContainer::setProbedFlag(char** rows, int32_t numRows) { for (auto i = 0; i < numRows; i++) { // Row may be null in case of a FULL join. diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 8c7f7b56805e..5fe625636e4c 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -686,8 +686,10 @@ class RowContainer { /// Creates a next-row-vector if it doesn't exist. Appends the row address to /// the next-row-vector, and store the address of the next-row-vector in the - /// 'nextOffset_' slot for all duplicate rows. - void appendNextRow(char* current, char* nextRow); + /// 'nextOffset_' slot for all duplicate rows. 'allocator' is provided for the + /// duplicate row vector allocations. + void + appendNextRow(char* current, char* nextRow, HashStringAllocator* allocator); NextRowVector*& getNextRowVector(char* row) const { return *reinterpret_cast(row + nextOffset_); @@ -725,9 +727,6 @@ class RowContainer { /// Resets the state to be as after construction. Frees memory for payload. void clear(); - /// Frees memory for next row vectors. - void clearNextRowVectors(); - int32_t compareRows( const char* left, const char* right, @@ -808,10 +807,6 @@ class RowContainer { return mutable_; } - bool checkFree() const { - return checkFree_; - } - /// Returns a summary of the container: key types, dependent types, number of /// accumulators and number of rows. std::string toString() const; @@ -1281,7 +1276,6 @@ class RowContainer { CompareFlags flags = CompareFlags()); // Free variable-width fields at column `column_index` associated with the - // 'rows', and if 'checkFree_' is true, zero out complex-typed field in // 'rows'. `FieldType` is the type of data representation of the fields in // row, and can be one of StringView(represents VARCHAR) and // std::string_view(represents ARRAY, MAP or ROW). @@ -1310,9 +1304,6 @@ class RowContainer { } } stringAllocator_->free(HashStringAllocator::headerOf(view.data())); - if (checkFree_) { - view = FieldType(); - } } } @@ -1334,11 +1325,12 @@ class RowContainer { columnHasNulls_[columnIndex] = columnHasNulls_[columnIndex] || hasNulls; } - const bool checkFree_ = false; - const std::vector keyTypes_; const bool nullableKeys_; const bool isJoinBuild_; + // True if normalized keys are enabled in initial state. + const bool hasNormalizedKeys_; + const std::shared_ptr stringAllocator_; std::vector columnHasNulls_; @@ -1355,6 +1347,8 @@ class RowContainer { std::vector types_; std::vector typeKinds_; int32_t nextOffset_ = 0; + // Indicates if this row container has rows with duplicate keys. This only + // applies if 'nextOffset_' is set. bool hasDuplicateRows_{false}; // Bit position of null bit in the row. 0 if no null flag. Order is keys, // accumulators, dependent. @@ -1375,8 +1369,6 @@ class RowContainer { int32_t fixedRowSize_; // How many bytes do the flags (null, probed, free) occupy. int32_t flagBytes_; - // True if normalized keys are enabled in initial state. - const bool hasNormalizedKeys_; // The count of entries that have an extra normalized_key_t before the // start. int64_t numRowsWithNormalizedKey_ = 0; @@ -1395,7 +1387,6 @@ class RowContainer { uint64_t numFreeRows_ = 0; memory::AllocationPool rows_; - std::shared_ptr stringAllocator_; int alignment_ = 1; }; diff --git a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp index aba2a254068b..6945900aee89 100644 --- a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp +++ b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp @@ -166,15 +166,18 @@ struct HashTableBenchmarkResult { double eraseClock{0}; + uint64_t peakMemoryBytes{0}; + // The mode of the table. BaseHashTable::HashMode hashMode; - void merge(HashTableBenchmarkResult other) { - numIter++; + void merge(const HashTableBenchmarkResult& other) { + ++numIter; buildClocks += other.buildClocks; listJoinResultClocks += other.listJoinResultClocks; totalClock += other.totalClock; eraseClock += other.eraseClock; + peakMemoryBytes += other.peakMemoryBytes; } std::string toString() const { @@ -186,7 +189,8 @@ struct HashTableBenchmarkResult { << " listJoinResultClocks=" << listJoinResultClocks << "(" << (listJoinResultClocks / totalClock * 100) << "%) buildClocks=" << buildClocks << "(" - << (buildClocks / totalClock * 100) << "%)"; + << (buildClocks / totalClock * 100) << "%)" + << " peakMemoryBytes=" << succinctBytes(peakMemoryBytes); if (params.runErase) { out << " eraseClock=" << eraseClock << "(" << (eraseClock / totalClock * 100) << "%)"; @@ -201,13 +205,21 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { : randomEngine_((std::random_device{}())) {} HashTableBenchmarkResult run(HashTableBenchmarkParams params) { + std::shared_ptr tableAggregatePool = + rootPool_->addAggregateChild("tableAggregate"); + std::vector> tablePools; + tablePools.reserve(params.numTables); + for (int i = 0; i < params_.numTables; ++i) { + tablePools.push_back( + tableAggregatePool->addLeafChild(fmt::format("table{}", i))); + } params_ = params; HashTableBenchmarkResult result; result.params = params_; - SelectivityInfo totalClock; + uint64_t totalClocks{0}; { - SelectivityTimer timer(totalClock, 0); - buildTable(); + ClockTimer timer(totalClocks); + buildTable(tablePools); result.numOutput = probeTableAndListResult(); result.hashMode = topTable_->hashMode(); VELOX_CHECK_EQ(result.hashMode, params_.mode); @@ -219,8 +231,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { } result.buildClocks += buildTime_; result.listJoinResultClocks += listJoinResultTime_; - result.totalClock += totalClock.timeToDropValue(); - + result.totalClock = totalClocks; + result.peakMemoryBytes = tableAggregatePool->peakBytes(); return result; } @@ -348,7 +360,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { } // Prepare join table. - void buildTable() { + void buildTable( + const std::vector>& tablePools) { std::vector dependentTypes; std::vector> otherTables; std::vector batches; @@ -363,7 +376,7 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { true, false, 1'000, - pool_.get()); + tablePools[i].get()); copyVectorsToTable(batches[i], table.get()); if (i == 0) { @@ -372,15 +385,15 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { otherTables.push_back(std::move(table)); } } - SelectivityInfo buildClocks; + uint64_t buildClocks{0}; { - SelectivityTimer timer(buildClocks, 0); + ClockTimer timer(buildClocks); topTable_->prepareJoinTable( std::move(otherTables), BaseHashTable::kNoSpillInputStartPartitionBit, executor_.get()); } - buildTime_ = buildClocks.timeToDropValue(); + buildTime_ = buildClocks; } void probeTable( @@ -428,9 +441,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { // Hash probe and list join result. int64_t probeTableAndListResult() { auto lookup = std::make_unique(topTable_->hashers()); - auto numBatch = params_.probeSize / params_.hashTableSize; - auto batchSize = params_.hashTableSize; - SelectivityInfo listJoinResultClocks; + const auto numBatch = params_.probeSize / params_.hashTableSize; + const auto batchSize = params_.hashTableSize; BufferPtr outputRowMapping; auto outputBatchSize = batchSize; std::vector outputTableRows; @@ -444,8 +456,9 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { auto mapping = initializeRowNumberMapping( outputRowMapping, outputBatchSize, pool_.get()); outputTableRows.resize(outputBatchSize); + uint64_t listJoinResultClocks{0}; { - SelectivityTimer timer(listJoinResultClocks, 0); + ClockTimer timer(listJoinResultClocks); while (!resultsIter.atEnd()) { numJoinListResult += topTable_->listJoinResults( resultsIter, @@ -455,8 +468,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { std::numeric_limits::max()); } } + listJoinResultTime_ += listJoinResultClocks; } - listJoinResultTime_ = listJoinResultClocks.timeToDropValue(); return numJoinListResult; } @@ -464,7 +477,6 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { auto lookup = std::make_unique(topTable_->hashers()); auto batchSize = 10000; auto mode = topTable_->hashMode(); - SelectivityInfo eraseClock; BufferPtr outputRowMapping; auto outputBatchSize = topTable_->rows()->numRows() + 2; std::vector outputTableRows; @@ -482,12 +494,13 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { mapping, folly::Range(outputTableRows.data(), outputTableRows.size()), std::numeric_limits::max()); + uint64_t eraseClocks{0}; { - SelectivityTimer timer(eraseClock, 0); + ClockTimer timer(eraseClocks); topTable_->rows()->eraseRows( folly::Range(outputTableRows.data(), num)); } - eraseTime_ += eraseClock.timeToDropValue(); + eraseTime_ = eraseClocks; } std::default_random_engine randomEngine_; diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index 26a666fe2d81..129efad3411e 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -1951,15 +1951,15 @@ TEST_F(RowContainerTest, nextRowVector) { }; auto validateNextRowVector = [&]() { - for (int i = 0; i < rows.size(); i++) { + for (int i = 0; i < rows.size(); ++i) { auto vector = data->getNextRowVector(rows[i]); if (vector) { auto iter = std::find(vector->begin(), vector->end(), rows[i]); - EXPECT_NE(iter, vector->end()); - EXPECT_TRUE(vector->size() <= 2 && vector->size() > 0); + ASSERT_NE(iter, vector->end()); + ASSERT_TRUE(vector->size() <= 2 && vector->size() > 0); for (auto next : *vector) { - EXPECT_EQ(data->getNextRowVector(next), vector); - EXPECT_TRUE(std::find(rows.begin(), rows.end(), next) != rows.end()); + ASSERT_EQ(data->getNextRowVector(next), vector); + ASSERT_TRUE(std::find(rows.begin(), rows.end(), next) != rows.end()); } } } @@ -1969,18 +1969,18 @@ TEST_F(RowContainerTest, nextRowVector) { for (int i = 0; i < numRows; ++i) { rows.push_back(data->newRow()); rowSet.insert(rows.back()); - EXPECT_EQ(data->getNextRowVector(rows.back()), nullptr); + ASSERT_EQ(data->getNextRowVector(rows.back()), nullptr); } - EXPECT_EQ(numRows, data->numRows()); + ASSERT_EQ(numRows, data->numRows()); std::vector rowsFromContainer(numRows); RowContainerIterator iter; - EXPECT_EQ( + ASSERT_EQ( data->listRows(&iter, numRows, rowsFromContainer.data()), numRows); - EXPECT_EQ(0, data->listRows(&iter, numRows, rows.data())); - EXPECT_EQ(rows, rowsFromContainer); + ASSERT_EQ(0, data->listRows(&iter, numRows, rows.data())); + ASSERT_EQ(rows, rowsFromContainer); for (int i = 0; i + 2 <= numRows; i += 2) { - data->appendNextRow(rows[i], rows[i + 1]); + data->appendNextRow(rows[i], rows[i + 1], &data->stringAllocator()); } validateNextRowVector(); }; @@ -2008,7 +2008,6 @@ TEST_F(RowContainerTest, nextRowVector) { std::vector eraseRows(numRows); std::iota(eraseRows.begin(), eraseRows.end(), 0); nextRowVectorEraseValidation(eraseRows); - VELOX_ASSERT_THROW( nextRowVectorEraseValidation({1}), "All rows with the same keys must be present in 'rows'");