Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
Signed-off-by: guo-shaoge <[email protected]>
  • Loading branch information
guo-shaoge committed Dec 24, 2024
1 parent 9623368 commit 0ef6ba6
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 215 deletions.
52 changes: 0 additions & 52 deletions dbms/src/Common/ColumnsHashingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,6 @@ class HashMethodBase
using FindResult = FindResultImpl<Mapped>;
static constexpr bool has_mapped = !std::is_same<Mapped, VoidMapped>::value;
using Cache = LastElementCache<Value, consecutive_keys_optimization>;
static constexpr size_t prefetch_step = 16;

template <typename Map>
static ALWAYS_INLINE inline void prefetch(Map & map, size_t idx, const std::vector<size_t> & hashvals)
{
const auto prefetch_idx = idx + prefetch_step;
if likely (prefetch_idx < hashvals.size())
map.prefetch(hashvals[prefetch_idx]);
}

// Emplace key without hashval, and this method doesn't support prefetch.
template <typename Data>
Expand Down Expand Up @@ -185,49 +176,6 @@ class HashMethodBase
return findKeyImpl(keyHolderGetKey(key_holder), data, hashval);
}

// Emplace key using hashval, you can enable prefetch or not.
template <bool enable_prefetch = false, typename Data>
ALWAYS_INLINE inline EmplaceResult emplaceKey(
Data & data,
size_t row,
Arena & pool,
std::vector<String> & sort_key_containers,
const std::vector<size_t> & hashvals)
{
auto key_holder = static_cast<Derived &>(*this).getKeyHolder(row, &pool, sort_key_containers);
if constexpr (enable_prefetch)
{
assert(hashvals.size() == static_cast<Derived &>(*this).total_rows);
prefetch(data, row, hashvals);
return emplaceImpl(key_holder, data, hashvals[row]);
}
else
{
return emplaceImpl(key_holder, data);
}
}

template <bool enable_prefetch = false, typename Data>
ALWAYS_INLINE inline FindResult findKey(
Data & data,
size_t row,
Arena & pool,
std::vector<String> & sort_key_containers,
const std::vector<size_t> & hashvals)
{
auto key_holder = static_cast<Derived &>(*this).getKeyHolder(row, &pool, sort_key_containers);
if constexpr (enable_prefetch)
{
assert(hashvals.size() == static_cast<Derived &>(*this).total_rows);
prefetch(data, row, hashvals);
return findKeyImpl(keyHolderGetKey(key_holder), data, hashvals[row]);
}
else
{
return findKeyImpl(keyHolderGetKey(key_holder), data);
}
}

template <typename Data>
ALWAYS_INLINE inline size_t getHash(
const Data & data,
Expand Down
184 changes: 30 additions & 154 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -721,38 +721,6 @@ void getHashVals(
}
}

template <bool only_lookup, typename Method>
std::optional<typename Method::template EmplaceOrFindKeyResult<only_lookup>::ResultType> Aggregator::emplaceOrFindKey(
Method & method,
typename Method::State & state,
size_t index,
Arena & aggregates_pool,
std::vector<std::string> & sort_key_containers,
const std::vector<size_t> & hashvals) const
{
try
{
if constexpr (only_lookup)
return state.template findKey</*enable_prefetch*/ true>(
method.data,
index,
aggregates_pool,
sort_key_containers,
hashvals);
else
return state.template emplaceKey</*enable_prefetch*/ true>(
method.data,
index,
aggregates_pool,
sort_key_containers,
hashvals);
}
catch (ResizeException &)
{
return {};
}
}

template <bool only_lookup, typename Method>
std::optional<typename Method::template EmplaceOrFindKeyResult<only_lookup>::ResultType> Aggregator::emplaceOrFindKey(
Method & method,
Expand All @@ -765,19 +733,9 @@ std::optional<typename Method::template EmplaceOrFindKeyResult<only_lookup>::Res
try
{
if constexpr (only_lookup)
return state.template findKey(
method.data,
index,
aggregates_pool,
sort_key_containers,
hashval);
return state.template findKey(method.data, index, aggregates_pool, sort_key_containers, hashval);
else
return state.template emplaceKey(
method.data,
index,
aggregates_pool,
sort_key_containers,
hashval);
return state.template emplaceKey(method.data, index, aggregates_pool, sort_key_containers, hashval);
}
catch (ResizeException &)
{
Expand Down Expand Up @@ -879,7 +837,7 @@ ALWAYS_INLINE void Aggregator::executeImplByRow(
i,
*aggregates_pool,
sort_key_containers,
agg_process_info.hashvals);
agg_process_info.hashvals[i]);

HANDLE_AGG_EMPLACE_RESULT
}
Expand All @@ -893,7 +851,13 @@ ALWAYS_INLINE void Aggregator::executeImplByRow(
}

if likely (processed_rows)
{
agg_process_info.start_row = *processed_rows + 1;

if likely (agg_process_info.start_row == agg_process_info.end_row)
agg_process_info.hashvals.clear();
}

#undef HANDLE_AGG_EMPLACE_RESULT
return;
}
Expand Down Expand Up @@ -972,9 +936,9 @@ ALWAYS_INLINE void Aggregator::executeImplByRow(
\
if constexpr (collect_hit_rate) \
++agg_process_info.hit_row_cnt; \
\
if constexpr (enable_prefetch) \
__builtin_prefetch(aggregate_data); \
\
if constexpr (enable_prefetch) \
__builtin_prefetch(aggregate_data); \
} \
} \
\
Expand All @@ -993,33 +957,8 @@ ALWAYS_INLINE void Aggregator::executeImplByRow(
}
}

// for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + rows; ++i)
// {
// AggregateDataPtr aggregate_data = nullptr;
// if constexpr (enable_prefetch)
// {
// auto emplace_result_holder = emplaceOrFindKey<only_lookup>(
// method,
// state,
// i,
// *aggregates_pool,
// sort_key_containers,
// agg_process_info.hashvals);

// HANDLE_AGG_EMPLACE_RESULT
// }
// else
// {
// auto emplace_result_holder
// = emplaceOrFindKey<only_lookup>(method, state, i, *aggregates_pool, sort_key_containers);

// HANDLE_AGG_EMPLACE_RESULT
// }
// }

size_t i = agg_process_info.start_row;
const size_t end = agg_process_info.end_row;
// const size_t end = *processed_rows - agg_process_info.start_row + 1;
const size_t mini_batch = 256;
while (i < end)
{
Expand All @@ -1028,6 +967,7 @@ ALWAYS_INLINE void Aggregator::executeImplByRow(
batch_size = end - i;

const auto cur_batch_end = i + batch_size;
// j is the row index inside each mini batch.
for (size_t j = i; j < cur_batch_end; ++j)
{
AggregateDataPtr aggregate_data = nullptr;
Expand All @@ -1038,18 +978,14 @@ ALWAYS_INLINE void Aggregator::executeImplByRow(
const size_t prefetch_idx = j + prefetch_step;
if likely (prefetch_idx < end)
{
const auto new_hashval = state.getHash(method.data, prefetch_idx, *aggregates_pool, sort_key_containers);
const auto new_hashval
= state.getHash(method.data, prefetch_idx, *aggregates_pool, sort_key_containers);
method.data.prefetch(new_hashval);
hashvals[prefetch_hash_idx] = new_hashval;
}

auto emplace_result_holder = emplaceOrFindKey<only_lookup>(
method,
state,
j,
*aggregates_pool,
sort_key_containers,
hashval);
auto emplace_result_holder
= emplaceOrFindKey<only_lookup>(method, state, j, *aggregates_pool, sort_key_containers, hashval);

HANDLE_AGG_EMPLACE_RESULT
}
Expand All @@ -1065,90 +1001,29 @@ ALWAYS_INLINE void Aggregator::executeImplByRow(
if unlikely (!processed_rows.has_value())
break;

size_t processed_size = *processed_rows - i + 1;
// bool first_inst = true;
const size_t processed_size = *processed_rows - i + 1;
for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that;
++inst)
++inst)
{
// if (first_inst)
// inst->batch_that->addBatchWithPrefetch(
// i,
// batch_size,
// places.get() + i,
// inst->state_offset,
// inst->batch_arguments,
// aggregates_pool);
// else
inst->batch_that->addBatch(
i,
processed_size,
places.get() + i,
inst->state_offset,
inst->batch_arguments,
aggregates_pool);
// first_inst = false;
i,
processed_size,
places.get() + i,
inst->state_offset,
inst->batch_arguments,
aggregates_pool);
}

i += processed_size;

if unlikely (processed_size != batch_size)
break;

i = cur_batch_end;
}

if likely (processed_rows)
agg_process_info.start_row = *processed_rows + 1;

#undef HANDLE_AGG_EMPLACE_RESULT

// if (processed_rows)
// {
// // /// Add values to the aggregate functions.
// // for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that;
// // ++inst)
// // {
// // inst->batch_that->addBatch(
// // agg_process_info.start_row,
// // *processed_rows - agg_process_info.start_row + 1,
// // places.get(),
// // inst->state_offset,
// // inst->batch_arguments,
// // aggregates_pool);
// // }
// size_t i = agg_process_info.start_row;
// const size_t end = *processed_rows - agg_process_info.start_row + 1;
// const size_t step = 256;
// while (i < end)
// {
// size_t batch_size = step;
// if unlikely (i + batch_size > end)
// batch_size = end - i;

// bool first_inst = true;
// for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that;
// ++inst)
// {
// if (first_inst)
// inst->batch_that->addBatchWithPrefetch(
// i,
// batch_size,
// places.get() + i,
// inst->state_offset,
// inst->batch_arguments,
// aggregates_pool);
// else
// inst->batch_that->addBatch(
// i,
// batch_size,
// places.get() + i,
// inst->state_offset,
// inst->batch_arguments,
// aggregates_pool);
// first_inst = false;
// }
// i += batch_size;
// }
// agg_process_info.start_row = *processed_rows + 1;
// }
}

void NO_INLINE
Expand Down Expand Up @@ -1993,12 +1868,13 @@ void NO_INLINE Aggregator::convertToBlocksImplFinal(
agg_keys_helpers[key_columns_vec_index]
->insertKeyIntoColumns(key, key_columns_vec[key_columns_vec_index], key_sizes_ref, params.collators);
}
// insertAggregatesIntoColumns(mapped, final_aggregate_columns_vec[key_columns_vec_index], arena);
places[data_index] = mapped;
++data_index;
});

auto prefetch_idx = 16;
// convertToBlockImplFinal didn't prefetch because it's used during spill.
// places vector will occupy extra memory, also it doesn't care performance during spill.
size_t prefetch_idx = 16;
data_index = 0;
for (size_t i = 0; i < rows; ++i)
{
Expand Down
14 changes: 5 additions & 9 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,9 @@ class Aggregator
size_t hit_row_cnt = 0;
std::vector<UInt64> not_found_rows;

// Used by prefetch.
std::vector<size_t> hashvals;

void prepareForAgg();
bool allBlockDataHandled() const
{
Expand All @@ -1336,6 +1339,8 @@ class Aggregator
hit_row_cnt = 0;
not_found_rows.clear();
not_found_rows.reserve(block_.rows() / 2);

hashvals.clear();
}
};

Expand Down Expand Up @@ -1460,15 +1465,6 @@ class Aggregator
Arena * aggregates_pool,
AggProcessInfo & agg_process_info) const;

template <bool only_lookup, typename Method>
std::optional<typename Method::template EmplaceOrFindKeyResult<only_lookup>::ResultType> emplaceOrFindKey(
Method & method,
typename Method::State & state,
size_t index,
Arena & aggregates_pool,
std::vector<std::string> & sort_key_containers,
const std::vector<size_t> & hashvals) const;

template <bool only_lookup, typename Method>
std::optional<typename Method::template EmplaceOrFindKeyResult<only_lookup>::ResultType> emplaceOrFindKey(
Method & method,
Expand Down

0 comments on commit 0ef6ba6

Please sign in to comment.