diff --git a/dbms/src/Common/ColumnsHashingImpl.h b/dbms/src/Common/ColumnsHashingImpl.h index 61c4e3e581a..c7cec18c2db 100644 --- a/dbms/src/Common/ColumnsHashingImpl.h +++ b/dbms/src/Common/ColumnsHashingImpl.h @@ -128,15 +128,6 @@ class HashMethodBase using FindResult = FindResultImpl; static constexpr bool has_mapped = !std::is_same::value; using Cache = LastElementCache; - static constexpr size_t prefetch_step = 16; - - template - static ALWAYS_INLINE inline void prefetch(Map & map, size_t idx, const std::vector & hashvals) - { - const auto prefetch_idx = idx + prefetch_step; - if likely (prefetch_idx < hashvals.size()) - map.prefetch(hashvals[prefetch_idx]); - } // Emplace key without hashval, and this method doesn't support prefetch. template @@ -185,49 +176,6 @@ class HashMethodBase return findKeyImpl(keyHolderGetKey(key_holder), data, hashval); } - // Emplace key using hashval, you can enable prefetch or not. - template - ALWAYS_INLINE inline EmplaceResult emplaceKey( - Data & data, - size_t row, - Arena & pool, - std::vector & sort_key_containers, - const std::vector & hashvals) - { - auto key_holder = static_cast(*this).getKeyHolder(row, &pool, sort_key_containers); - if constexpr (enable_prefetch) - { - assert(hashvals.size() == static_cast(*this).total_rows); - prefetch(data, row, hashvals); - return emplaceImpl(key_holder, data, hashvals[row]); - } - else - { - return emplaceImpl(key_holder, data); - } - } - - template - ALWAYS_INLINE inline FindResult findKey( - Data & data, - size_t row, - Arena & pool, - std::vector & sort_key_containers, - const std::vector & hashvals) - { - auto key_holder = static_cast(*this).getKeyHolder(row, &pool, sort_key_containers); - if constexpr (enable_prefetch) - { - assert(hashvals.size() == static_cast(*this).total_rows); - prefetch(data, row, hashvals); - return findKeyImpl(keyHolderGetKey(key_holder), data, hashvals[row]); - } - else - { - return findKeyImpl(keyHolderGetKey(key_holder), data); - } - } - template ALWAYS_INLINE inline size_t getHash( const Data & data, diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 56ed42aeab1..3eb3e3191a3 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -721,38 +721,6 @@ void getHashVals( } } -template -std::optional::ResultType> Aggregator::emplaceOrFindKey( - Method & method, - typename Method::State & state, - size_t index, - Arena & aggregates_pool, - std::vector & sort_key_containers, - const std::vector & hashvals) const -{ - try - { - if constexpr (only_lookup) - return state.template findKey( - method.data, - index, - aggregates_pool, - sort_key_containers, - hashvals); - else - return state.template emplaceKey( - method.data, - index, - aggregates_pool, - sort_key_containers, - hashvals); - } - catch (ResizeException &) - { - return {}; - } -} - template std::optional::ResultType> Aggregator::emplaceOrFindKey( Method & method, @@ -765,19 +733,9 @@ std::optional::Res try { if constexpr (only_lookup) - return state.template findKey( - method.data, - index, - aggregates_pool, - sort_key_containers, - hashval); + return state.template findKey(method.data, index, aggregates_pool, sort_key_containers, hashval); else - return state.template emplaceKey( - method.data, - index, - aggregates_pool, - sort_key_containers, - hashval); + return state.template emplaceKey(method.data, index, aggregates_pool, sort_key_containers, hashval); } catch (ResizeException &) { @@ -879,7 +837,7 @@ ALWAYS_INLINE void Aggregator::executeImplByRow( i, *aggregates_pool, sort_key_containers, - agg_process_info.hashvals); + agg_process_info.hashvals[i]); HANDLE_AGG_EMPLACE_RESULT } @@ -893,7 +851,13 @@ ALWAYS_INLINE void Aggregator::executeImplByRow( } if likely (processed_rows) + { agg_process_info.start_row = *processed_rows + 1; + + if likely (agg_process_info.start_row == agg_process_info.end_row) + agg_process_info.hashvals.clear(); + } + #undef HANDLE_AGG_EMPLACE_RESULT return; } @@ -972,9 +936,9 @@ ALWAYS_INLINE void Aggregator::executeImplByRow( \ if constexpr (collect_hit_rate) \ ++agg_process_info.hit_row_cnt; \ - \ - if constexpr (enable_prefetch) \ - __builtin_prefetch(aggregate_data); \ + \ + if constexpr (enable_prefetch) \ + __builtin_prefetch(aggregate_data); \ } \ } \ \ @@ -993,33 +957,8 @@ ALWAYS_INLINE void Aggregator::executeImplByRow( } } - // for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + rows; ++i) - // { - // AggregateDataPtr aggregate_data = nullptr; - // if constexpr (enable_prefetch) - // { - // auto emplace_result_holder = emplaceOrFindKey( - // method, - // state, - // i, - // *aggregates_pool, - // sort_key_containers, - // agg_process_info.hashvals); - - // HANDLE_AGG_EMPLACE_RESULT - // } - // else - // { - // auto emplace_result_holder - // = emplaceOrFindKey(method, state, i, *aggregates_pool, sort_key_containers); - - // HANDLE_AGG_EMPLACE_RESULT - // } - // } - size_t i = agg_process_info.start_row; const size_t end = agg_process_info.end_row; - // const size_t end = *processed_rows - agg_process_info.start_row + 1; const size_t mini_batch = 256; while (i < end) { @@ -1028,6 +967,7 @@ ALWAYS_INLINE void Aggregator::executeImplByRow( batch_size = end - i; const auto cur_batch_end = i + batch_size; + // j is the row index inside each mini batch. for (size_t j = i; j < cur_batch_end; ++j) { AggregateDataPtr aggregate_data = nullptr; @@ -1038,18 +978,14 @@ ALWAYS_INLINE void Aggregator::executeImplByRow( const size_t prefetch_idx = j + prefetch_step; if likely (prefetch_idx < end) { - const auto new_hashval = state.getHash(method.data, prefetch_idx, *aggregates_pool, sort_key_containers); + const auto new_hashval + = state.getHash(method.data, prefetch_idx, *aggregates_pool, sort_key_containers); method.data.prefetch(new_hashval); hashvals[prefetch_hash_idx] = new_hashval; } - auto emplace_result_holder = emplaceOrFindKey( - method, - state, - j, - *aggregates_pool, - sort_key_containers, - hashval); + auto emplace_result_holder + = emplaceOrFindKey(method, state, j, *aggregates_pool, sort_key_containers, hashval); HANDLE_AGG_EMPLACE_RESULT } @@ -1065,90 +1001,29 @@ ALWAYS_INLINE void Aggregator::executeImplByRow( if unlikely (!processed_rows.has_value()) break; - size_t processed_size = *processed_rows - i + 1; - // bool first_inst = true; + const size_t processed_size = *processed_rows - i + 1; for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; - ++inst) + ++inst) { - // if (first_inst) - // inst->batch_that->addBatchWithPrefetch( - // i, - // batch_size, - // places.get() + i, - // inst->state_offset, - // inst->batch_arguments, - // aggregates_pool); - // else inst->batch_that->addBatch( - i, - processed_size, - places.get() + i, - inst->state_offset, - inst->batch_arguments, - aggregates_pool); - // first_inst = false; + i, + processed_size, + places.get() + i, + inst->state_offset, + inst->batch_arguments, + aggregates_pool); } - - i += processed_size; if unlikely (processed_size != batch_size) break; + + i = cur_batch_end; } if likely (processed_rows) agg_process_info.start_row = *processed_rows + 1; #undef HANDLE_AGG_EMPLACE_RESULT - - // if (processed_rows) - // { - // // /// Add values to the aggregate functions. - // // for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; - // // ++inst) - // // { - // // inst->batch_that->addBatch( - // // agg_process_info.start_row, - // // *processed_rows - agg_process_info.start_row + 1, - // // places.get(), - // // inst->state_offset, - // // inst->batch_arguments, - // // aggregates_pool); - // // } - // size_t i = agg_process_info.start_row; - // const size_t end = *processed_rows - agg_process_info.start_row + 1; - // const size_t step = 256; - // while (i < end) - // { - // size_t batch_size = step; - // if unlikely (i + batch_size > end) - // batch_size = end - i; - - // bool first_inst = true; - // for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; - // ++inst) - // { - // if (first_inst) - // inst->batch_that->addBatchWithPrefetch( - // i, - // batch_size, - // places.get() + i, - // inst->state_offset, - // inst->batch_arguments, - // aggregates_pool); - // else - // inst->batch_that->addBatch( - // i, - // batch_size, - // places.get() + i, - // inst->state_offset, - // inst->batch_arguments, - // aggregates_pool); - // first_inst = false; - // } - // i += batch_size; - // } - // agg_process_info.start_row = *processed_rows + 1; - // } } void NO_INLINE @@ -1993,12 +1868,13 @@ void NO_INLINE Aggregator::convertToBlocksImplFinal( agg_keys_helpers[key_columns_vec_index] ->insertKeyIntoColumns(key, key_columns_vec[key_columns_vec_index], key_sizes_ref, params.collators); } - // insertAggregatesIntoColumns(mapped, final_aggregate_columns_vec[key_columns_vec_index], arena); places[data_index] = mapped; ++data_index; }); - auto prefetch_idx = 16; + // convertToBlockImplFinal didn't prefetch because it's used during spill. + // places vector will occupy extra memory, also it doesn't care performance during spill. + size_t prefetch_idx = 16; data_index = 0; for (size_t i = 0; i < rows; ++i) { diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 379f2b1b611..8b1fd82b9fb 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1318,6 +1318,9 @@ class Aggregator size_t hit_row_cnt = 0; std::vector not_found_rows; + // Used by prefetch. + std::vector hashvals; + void prepareForAgg(); bool allBlockDataHandled() const { @@ -1336,6 +1339,8 @@ class Aggregator hit_row_cnt = 0; not_found_rows.clear(); not_found_rows.reserve(block_.rows() / 2); + + hashvals.clear(); } }; @@ -1460,15 +1465,6 @@ class Aggregator Arena * aggregates_pool, AggProcessInfo & agg_process_info) const; - template - std::optional::ResultType> emplaceOrFindKey( - Method & method, - typename Method::State & state, - size_t index, - Arena & aggregates_pool, - std::vector & sort_key_containers, - const std::vector & hashvals) const; - template std::optional::ResultType> emplaceOrFindKey( Method & method,