Skip to content

Commit

Permalink
enable clear of hashtable
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPresent-Han committed Nov 15, 2024
1 parent 8bdfd11 commit 1b0a0c1
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 91 deletions.
9 changes: 7 additions & 2 deletions internal/core/src/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void HashTable<nullableKeys>::allocateTables(uint64_t size) {
// The total size is 8 bytes per slot, in groups of 16 slots with 16 bytes of
// tags and 16 * 6 bytes of pointers and a padding of 16 bytes to round up the
// cache line.
// TODO must support memory pool here to avoid OOM
// TODO support memory pool here to avoid OOM
table_ = new char*[capacity_];
memset(table_, 0, capacity_ * sizeof(char*));
}

Check warning on line 208 in internal/core/src/exec/HashTable.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/HashTable.cpp#L206-L208

Added lines #L206 - L208 were not covered by tests
Expand Down Expand Up @@ -330,7 +330,12 @@ void HashTable<nullableKeys>::setHashMode(HashMode mode, int32_t numNew) {

template <bool nullable>
void HashTable<nullable>::clear(bool freeTable) {

if(table_) {
delete[] table_;
table_ = nullptr;

Check warning on line 335 in internal/core/src/exec/HashTable.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/HashTable.cpp#L332-L335

Added lines #L332 - L335 were not covered by tests
}
rows_->clear();
numDistinct_ = 0;
}

Check warning on line 339 in internal/core/src/exec/HashTable.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/HashTable.cpp#L337-L339

Added lines #L337 - L339 were not covered by tests

template class HashTable<true>;
Expand Down
6 changes: 4 additions & 2 deletions internal/core/src/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ virtual void clear(bool freeTable = false) = 0;
protected:
std::vector<std::unique_ptr<VectorHasher>> hashers_;
std::unique_ptr<RowContainer> rows_;
char** table_ = nullptr;
};

class ProbeState;
Expand All @@ -158,7 +157,7 @@ class HashTable : public BaseHashTable {
keyTypes.push_back(hasher->ChannelDataType());

Check warning on line 157 in internal/core/src/exec/HashTable.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/HashTable.h#L154-L157

Added lines #L154 - L157 were not covered by tests
}
hashMode_ = HashMode::kHash;
rows_ = std::make_unique<RowContainer>(keyTypes, accumulators, nullableKeys, hashMode_ != HashMode::kHash);
rows_ = std::make_unique<RowContainer>(keyTypes, accumulators, nullableKeys);
};

Check warning on line 161 in internal/core/src/exec/HashTable.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/HashTable.h#L159-L161

Added lines #L159 - L161 were not covered by tests

void setHashMode(HashMode mode, int32_t numNew) override;
Expand Down Expand Up @@ -242,6 +241,8 @@ class HashTable : public BaseHashTable {
// a power of 2.
void allocateTables(uint64_t size);

void extractGroups(char** output_groups, size_t group_count);

template<bool isJoin, bool isNormalizedKey = false>
void fullProbe(HashLookup& lookup, ProbeState& state, bool extraCheck);

Expand Down Expand Up @@ -287,6 +288,7 @@ class HashTable : public BaseHashTable {
int8_t sizeBits_;

int64_t numRehashes_{0};
char** table_ = nullptr;

HashMode hashMode() const override {
return hashMode_;

Check warning on line 294 in internal/core/src/exec/HashTable.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/HashTable.h#L293-L294

Added lines #L293 - L294 were not covered by tests
Expand Down
3 changes: 1 addition & 2 deletions internal/core/src/exec/operator/AggregationNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ RowVectorPtr PhyAggregationNode::GetOutput() {
auto batch_size = queryConfig->get_expr_batch_size();
const auto outputRowCount = isGlobal_? 1: batch_size;
prepareOutput(outputRowCount);
const bool hasData = grouping_set_->getOutput(outputRowCount, outputRowCount, resultIterator_, output_);
const bool hasData = grouping_set_->getOutput(output_);
if (!hasData) {
resultIterator_.reset();
if (no_more_input_) {
finished_ = true;

Check warning on line 42 in internal/core/src/exec/operator/AggregationNode.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/AggregationNode.cpp#L35-L42

Added lines #L35 - L42 were not covered by tests
}
Expand Down
2 changes: 0 additions & 2 deletions internal/core/src/exec/operator/AggregationNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ class PhyAggregationNode: public Operator {
// flush.
int64_t numOutputRows_ = 0;
bool finished_ = false;

RowContainerIterator resultIterator_;
};
}
}
29 changes: 11 additions & 18 deletions internal/core/src/exec/operator/query-agg/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,37 +95,30 @@ void GroupingSet::addGlobalAggregationInput(const milvus::RowVectorPtr& input) {
tempVectors_.clear();
}

Check warning on line 96 in internal/core/src/exec/operator/query-agg/GroupingSet.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.cpp#L95-L96

Added lines #L95 - L96 were not covered by tests

bool GroupingSet::getGlobalAggregationOutput(milvus::exec::RowContainerIterator &iterator,
milvus::RowVectorPtr &result) {
if (iterator.allocationIndex != 0) {
return false;
}
bool GroupingSet::getGlobalAggregationOutput(milvus::RowVectorPtr &result) {
initializeGlobalAggregation();
auto groups = lookup_->hits_.data();
for(auto i = 0; i < aggregates_.size(); i++) {
auto& function = aggregates_[i].function_;
auto resultVector = result->child(aggregates_[i].output_);
function->extractValues(groups, 1, &resultVector);

Check warning on line 104 in internal/core/src/exec/operator/query-agg/GroupingSet.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.cpp#L98-L104

Added lines #L98 - L104 were not covered by tests
}
iterator.allocationIndex = std::numeric_limits<int32_t>::max();
return true;

Check warning on line 106 in internal/core/src/exec/operator/query-agg/GroupingSet.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.cpp#L106

Added line #L106 was not covered by tests
}

bool GroupingSet::getOutput(int32_t maxOutputRows, int32_t maxOutputBytes, milvus::exec::RowContainerIterator &iterator,
milvus::RowVectorPtr &result) {
bool GroupingSet::getOutput(milvus::RowVectorPtr &result) {
if (hash_table_ == nullptr) {

Check warning on line 110 in internal/core/src/exec/operator/query-agg/GroupingSet.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.cpp#L109-L110

Added lines #L109 - L110 were not covered by tests
return false;
}
if (isGlobal_) {
return getGlobalAggregationOutput(iterator, result);
}
char* groups[maxOutputRows];
const int32_t numGroups = hash_table_?hash_table_->rows()
->listRows(&iterator, maxOutputRows, maxOutputBytes, groups):0;
if(numGroups == 0) {
if (hash_table_ != nullptr) {
hash_table_->clear();
}
return getGlobalAggregationOutput(result);

Check warning on line 114 in internal/core/src/exec/operator/query-agg/GroupingSet.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.cpp#L113-L114

Added lines #L113 - L114 were not covered by tests
}
const auto& all_rows = hash_table_->rows()->allRows();
if(all_rows.empty()) {
hash_table_->clear();
return false;

Check warning on line 119 in internal/core/src/exec/operator/query-agg/GroupingSet.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.cpp#L116-L119

Added lines #L116 - L119 were not covered by tests
}
extractGroups(folly::Range<char**>(groups, numGroups), result);
extractGroups(folly::Range<char**>(const_cast<char**>(all_rows.data()), all_rows.size()), result);
return true;

Check warning on line 122 in internal/core/src/exec/operator/query-agg/GroupingSet.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.cpp#L121-L122

Added lines #L121 - L122 were not covered by tests
}

Expand Down
7 changes: 2 additions & 5 deletions internal/core/src/exec/operator/query-agg/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ class GroupingSet {
// fit.
void ensureInputFits(const RowVectorPtr& input);

bool getOutput(int32_t maxOutputRows,
int32_t maxOutputBytes,
RowContainerIterator& iterator,
RowVectorPtr& result);
bool getOutput(RowVectorPtr& result);

bool hasOutput() {
return noMoreInput_;

Check warning on line 64 in internal/core/src/exec/operator/query-agg/GroupingSet.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/GroupingSet.h#L64

Added line #L64 was not covered by tests
Expand All @@ -71,7 +68,7 @@ class GroupingSet {

void populateTempVectors(int32_t aggregateIndex, const RowVectorPtr& input);

bool getGlobalAggregationOutput(RowContainerIterator& iterator, RowVectorPtr& result);
bool getGlobalAggregationOutput(RowVectorPtr& result);

private:
const bool isGlobal_;
Expand Down
22 changes: 8 additions & 14 deletions internal/core/src/exec/operator/query-agg/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ namespace exec {

RowContainer::RowContainer(const std::vector<DataType> &keyTypes,

Check warning on line 24 in internal/core/src/exec/operator/query-agg/RowContainer.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.cpp#L24

Added line #L24 was not covered by tests
const std::vector<Accumulator>& accumulators,
bool nullableKeys,
bool hasNormalizedKeys):
bool nullableKeys):
keyTypes_(keyTypes),
accumulators_(accumulators),
nullableKeys_(nullableKeys),
hasNormalizedKeys_(hasNormalizedKeys){
nullableKeys_(nullableKeys){
int32_t offset = 0;
int32_t nullOffset = 0;
bool isVariableWidth = false;
Expand Down Expand Up @@ -109,17 +107,13 @@ RowContainer::RowContainer(const std::vector<DataType> &keyTypes,
}

Check warning on line 107 in internal/core/src/exec/operator/query-agg/RowContainer.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.cpp#L107

Added line #L107 was not covered by tests

char* RowContainer::newRow() {
++numRows_;
char* row;
if (firstFreeRow_) {
row = firstFreeRow_;
AssertInfo(milvus::bits::isBitSet(row, freeFlagOffset_), "freeRow must be freed before inserted into the linked list");
firstFreeRow_ = nextFree(row);
--numFreeRows_;
} else {
row = new char[fixedRowSize_ + alignment_];
char* row = new char[fixedRowSize_ + alignment_];
if (rows_.size() < numRows_ + 1) {
rows_.resize(numRows_ + 1024);

Check warning on line 112 in internal/core/src/exec/operator/query-agg/RowContainer.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.cpp#L109-L112

Added lines #L109 - L112 were not covered by tests
}
return nullptr;
rows_[numRows_] = row;
++numRows_;
return row;

Check warning on line 116 in internal/core/src/exec/operator/query-agg/RowContainer.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.cpp#L114-L116

Added lines #L114 - L116 were not covered by tests
}

void RowContainer::store(const milvus::ColumnVectorPtr &column_data, milvus::vector_size_t index, char *row,

Check warning on line 119 in internal/core/src/exec/operator/query-agg/RowContainer.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.cpp#L119

Added line #L119 was not covered by tests
Expand Down
76 changes: 30 additions & 46 deletions internal/core/src/exec/operator/query-agg/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,11 @@ class RowColumn {
const uint64_t packedOffsets_;
};

using normalized_key_t = uint64_t;

struct RowContainerIterator {
int32_t allocationIndex = 0;
int32_t rowOffset = 0;

char* rowBegin_{nullptr};
inline char* currentRow() const {
return rowBegin_;
}

void reset() {
*this = {};
}
};

class RowContainer {
public:
RowContainer(const std::vector<DataType>& keyTypes,
const std::vector<Accumulator>& accumulators,
bool nullableKeys,
bool hasNormalizedKeys);
bool nullableKeys);

// The number of flags (bits) per accumulator, one for null and one for
// initialized.
Expand All @@ -155,13 +138,6 @@ class RowContainer {
return rowSizeOffset_;

Check warning on line 138 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L138

Added line #L138 was not covered by tests
}

int32_t listRows(RowContainerIterator* iter,
int32_t maxRows,
uint64_t maxBytes,
char** rows) {
return 0;
}

static inline bool isNullAt(const char* row, int32_t nullByte, uint8_t nullMask) {
return (row[nullByte] & nullMask) != 0;

Check warning on line 142 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L142

Added line #L142 was not covered by tests
}
Expand Down Expand Up @@ -291,22 +267,23 @@ class RowContainer {
const VectorPtr& result){
auto maxRows = numRows + resultOffset;
AssertInfo(maxRows == result->size(), "extracted rows number should be equal to the size of result vector");
if constexpr (std::is_same_v<T, std::string> || std::is_same_v<T, std::string_view>) {
PanicInfo(DataTypeInvalid, "Not support extract string values for now");
} else {
auto result_column_vec = std::dynamic_pointer_cast<ColumnVector>(result);
AssertInfo(result_column_vec != nullptr, "Input column to extract result must be of ColumnVector type");
for (auto i = 0; i < numRows; i++) {
const char *row;
if constexpr (useRowNumber) {
auto rowNumber = rowNumbers[i];
row = rowNumber >= 0 ? rows[rowNumber] : nullptr;
} else {
row = rows[i];
}
auto resultIndex = resultOffset + i;
if (row == nullptr || isNullAt(row, nullByte, nullMask)) {
result_column_vec->nullAt(resultIndex);
auto result_column_vec = std::dynamic_pointer_cast<ColumnVector>(result);
AssertInfo(result_column_vec != nullptr, "Input column to extract result must be of ColumnVector type");
for (auto i = 0; i < numRows; i++) {

Check warning on line 272 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L268-L272

Added lines #L268 - L272 were not covered by tests
const char *row;
if constexpr (useRowNumber) {
auto rowNumber = rowNumbers[i];
row = rowNumber >= 0 ? rows[rowNumber] : nullptr;

Check warning on line 276 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L275-L276

Added lines #L275 - L276 were not covered by tests
} else {
row = rows[i];

Check warning on line 278 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L278

Added line #L278 was not covered by tests
}
auto resultIndex = resultOffset + i;
if (row == nullptr || isNullAt(row, nullByte, nullMask)) {
result_column_vec->nullAt(resultIndex);

Check warning on line 282 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L280-L282

Added lines #L280 - L282 were not covered by tests
} else {
if constexpr (std::is_same_v<T, std::string> || std::is_same_v<T, std::string_view>) {
auto* str_ptr = valueAt<T*>(row, offset);
result_column_vec->SetValueAt<T>(resultIndex, *str_ptr);

Check warning on line 286 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L286

Added line #L286 was not covered by tests
} else {
result_column_vec->SetValueAt<T>(resultIndex, valueAt<T>(row, offset));

Check warning on line 288 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L288

Added line #L288 was not covered by tests
}
Expand Down Expand Up @@ -360,7 +337,6 @@ class RowContainer {
PanicInfo(DataTypeInvalid, "Not Support Extract types:[ROW/JSON/ARRAY/NONE]");

Check warning on line 337 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L337

Added line #L337 was not covered by tests
} else {
using T = typename milvus::TypeTraits<Type>::NativeType;

auto nullMask = column.nullMask();
auto offset = column.offset();
if (nullMask) {
Expand Down Expand Up @@ -403,6 +379,10 @@ class RowContainer {
extractColumn(rows, numRows, columnAt(column_idx), 0, result);
}

Check warning on line 380 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L378-L380

Added lines #L378 - L380 were not covered by tests

const std::vector<char*>& allRows() const {
return rows_;
}

static inline int32_t nullByte(int32_t nullOffset) {
return nullOffset / 8;

Check warning on line 387 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L387

Added line #L387 was not covered by tests
}
Expand All @@ -426,15 +406,20 @@ class RowContainer {
return nullMask(accumulatorFlagsOffset) << 1;

Check warning on line 406 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L406

Added line #L406 was not covered by tests
}


void clear() {
for (auto row: rows_) {
delete row;

Check warning on line 411 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L409-L411

Added lines #L409 - L411 were not covered by tests
}
numRows_ = 0;
numFreeRows_ = 0;
}

Check warning on line 415 in internal/core/src/exec/operator/query-agg/RowContainer.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/operator/query-agg/RowContainer.h#L413-L415

Added lines #L413 - L415 were not covered by tests

private:
// Offset of the pointer to the next free row on a free row.
static constexpr uint32_t kNextFreeOffset_ = 0;

const std::vector<DataType> keyTypes_;
const bool nullableKeys_;
const bool hasNormalizedKeys_;
std::vector<uint32_t> offsets_;
std::vector<uint32_t> nullOffsets_;

Expand All @@ -456,10 +441,9 @@ class RowContainer {

std::vector<Accumulator> accumulators_;

// Head of linked list of free rows.
char* firstFreeRow_ = nullptr;
uint64_t numRows_ = 0;
uint64_t numFreeRows_ = 0;
std::vector<char*> rows_{};
};

inline void RowContainer::extractColumn(const char *const *rows, int32_t num_rows, milvus::exec::RowColumn column,
Expand Down

0 comments on commit 1b0a0c1

Please sign in to comment.