From 1a0344df16bef3a9591f4782581a0bccd9ce874e Mon Sep 17 00:00:00 2001 From: Pxl Date: Thu, 12 Oct 2023 18:10:21 +0800 Subject: [PATCH] [Improvement](hash) refactor of hash map context (#24966) refactor of hash map context --- .../exec/aggregation_sink_operator.cpp | 135 ++--- .../pipeline/exec/aggregation_sink_operator.h | 24 +- .../exec/aggregation_source_operator.cpp | 32 +- ...ct_streaming_aggregation_sink_operator.cpp | 53 +- ...inct_streaming_aggregation_sink_operator.h | 1 + be/src/pipeline/exec/hashjoin_build_sink.cpp | 20 +- .../exec/partition_sort_sink_operator.cpp | 59 +-- .../exec/partition_sort_sink_operator.h | 11 - .../pipeline/exec/set_probe_sink_operator.cpp | 21 +- be/src/pipeline/exec/set_sink_operator.cpp | 2 +- be/src/pipeline/exec/set_sink_operator.h | 1 + be/src/pipeline/exec/set_source_operator.cpp | 8 +- .../streaming_aggregation_sink_operator.cpp | 5 +- be/src/pipeline/pipeline_x/dependency.cpp | 10 +- .../aggregate_function_collect.h | 13 +- .../aggregate_function_distinct.h | 19 +- .../aggregate_function_map.h | 29 +- .../aggregate_functions/key_holder_helpers.h | 49 -- be/src/vec/columns/column.h | 3 +- be/src/vec/columns/column_const.h | 5 +- be/src/vec/columns/column_decimal.cpp | 3 +- be/src/vec/columns/column_decimal.h | 11 +- be/src/vec/columns/column_nullable.cpp | 2 +- be/src/vec/columns/column_string.cpp | 3 +- be/src/vec/columns/column_string.h | 3 +- be/src/vec/columns/column_vector.cpp | 8 +- be/src/vec/columns/column_vector.h | 3 +- be/src/vec/common/aggregation_common.h | 148 ------ be/src/vec/common/columns_hashing.h | 342 ++---------- be/src/vec/common/columns_hashing_impl.h | 230 +-------- be/src/vec/common/hash_table/fixed_hash_map.h | 27 +- .../vec/common/hash_table/fixed_hash_table.h | 15 +- be/src/vec/common/hash_table/hash_map.h | 37 +- .../vec/common/hash_table/hash_map_context.h | 487 ++++++++++++++++++ be/src/vec/common/hash_table/hash_table.h | 91 ++-- .../common/hash_table/hash_table_key_holder.h | 150 ------ .../common/hash_table/hash_table_set_build.h | 81 +-- .../common/hash_table/hash_table_set_probe.h | 46 +- .../vec/common/hash_table/hash_table_utils.h | 2 - .../common/hash_table/partitioned_hash_map.h | 18 - .../hash_table/partitioned_hash_table.h | 36 +- be/src/vec/common/hash_table/ph_hash_map.h | 67 +-- .../vec/common/hash_table/string_hash_map.h | 57 +- .../vec/common/hash_table/string_hash_table.h | 142 +++-- .../vec/exec/distinct_vaggregation_node.cpp | 54 +- .../vec/exec/join/process_hash_table_probe.h | 12 +- .../exec/join/process_hash_table_probe_impl.h | 134 +---- be/src/vec/exec/join/vhash_join_node.cpp | 19 +- be/src/vec/exec/join/vhash_join_node.h | 216 ++------ be/src/vec/exec/vaggregation_node.cpp | 184 ++----- be/src/vec/exec/vaggregation_node.h | 456 ++-------------- be/src/vec/exec/vpartition_sort_node.cpp | 65 +-- be/src/vec/exec/vpartition_sort_node.h | 264 ++-------- be/src/vec/exec/vset_operation_node.cpp | 36 +- be/src/vec/exec/vset_operation_node.h | 1 + .../array/function_array_enumerate_uniq.cpp | 55 +- be/src/vec/sink/vdata_stream_sender.h | 3 + 57 files changed, 1233 insertions(+), 2775 deletions(-) delete mode 100644 be/src/vec/aggregate_functions/key_holder_helpers.h delete mode 100644 be/src/vec/common/aggregation_common.h create mode 100644 be/src/vec/common/hash_table/hash_map_context.h delete mode 100644 be/src/vec/common/hash_table/hash_table_key_holder.h diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 251904b024cbc6..419313dce79638 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -128,8 +128,8 @@ Status AggSinkLocalState::init(RuntimeState* state, std::visit( [&](auto&& agg_method) { - using HashTableType = std::decay_t; - using KeyType = typename HashTableType::key_type; + using HashTableType = std::decay_t; + using KeyType = typename HashTableType::Key; /// some aggregate functions (like AVG for decimal) have align issues. Base::_shared_state->aggregate_data_container.reset( @@ -203,17 +203,6 @@ Status AggSinkLocalState::_merge_with_serialized_key( template size_t AggSinkLocalState::_memory_usage() const { size_t usage = 0; - std::visit( - [&](auto&& agg_method) { - using HashMethodType = std::decay_t; - if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - HashMethodType>::value) { - usage += agg_method.keys_memory_usage; - } - usage += agg_method.data.get_buffer_size_in_bytes(); - }, - _agg_data->method_variant); - if (_agg_arena_pool) { usage += _agg_arena_pool->size(); } @@ -229,7 +218,7 @@ template void AggSinkLocalState::_update_memusage_with_serialized_key() { std::visit( [&](auto&& agg_method) -> void { - auto& data = agg_method.data; + auto& data = *agg_method.hash_table; auto arena_memory_usage = _agg_arena_pool->size() + Base::_shared_state->aggregate_data_container->memory_usage() - @@ -321,7 +310,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_he for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { if (Base::_shared_state->aggregate_evaluators[i]->is_merge() || for_spill) { - int col_id; + int col_id = 0; if constexpr (for_spill) { col_id = Base::_shared_state->probe_expr_ctxs.size() + i; } else { @@ -498,7 +487,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_ template size_t AggSinkLocalState::_get_hash_table_size() { - return std::visit([&](auto&& agg_method) { return agg_method.data.size(); }, + return std::visit([&](auto&& agg_method) { return agg_method.hash_table->size(); }, _agg_data->method_variant); } @@ -510,29 +499,20 @@ void AggSinkLocalState::_emplace_into_hash_table( [&](auto&& agg_method) -> void { SCOPED_TIMER(_hash_table_compute_timer); using HashMethodType = std::decay_t; - using HashTableType = std::decay_t; using AggState = typename HashMethodType::State; - AggState state(key_columns, Base::_shared_state->probe_key_sz, nullptr); - - _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); - - auto creator = [&](const auto& ctor, const auto& key) { - using KeyType = std::decay_t; - if constexpr (HashTableTraits::is_string_hash_table && - !std::is_same_v) { - StringRef string_ref = to_string_ref(key); - vectorized::ArenaKeyHolder key_holder {string_ref, *_agg_arena_pool}; - key_holder_persist_key(key_holder); - auto mapped = Base::_shared_state->aggregate_data_container->append_data( - key_holder.key); - static_cast(Base::_dependency->create_agg_status(mapped)); - ctor(key, mapped); - } else { - auto mapped = - Base::_shared_state->aggregate_data_container->append_data(key); - static_cast(Base::_dependency->create_agg_status(mapped)); - ctor(key, mapped); + AggState state(key_columns, Base::_shared_state->probe_key_sz); + agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz, + num_rows); + + auto creator = [this](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, *_agg_arena_pool); + auto mapped = + Base::_shared_state->aggregate_data_container->append_data(origin); + auto st = Base::_dependency->create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); } + ctor(key, mapped); }; auto creator_for_null_key = [&](auto& mapped) { @@ -541,49 +521,15 @@ void AggSinkLocalState::_emplace_into_hash_table( ._total_size_of_aggregate_states, Base::_parent->template cast() ._align_aggregate_states); - static_cast(Base::_dependency->create_agg_status(mapped)); - }; - - if constexpr (HashTableTraits::is_phmap) { - const auto& keys = state.get_keys(); - if (_hash_values.size() < num_rows) { - _hash_values.resize(num_rows); - } - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(keys[i]); + auto st = Base::_dependency->create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); } + }; - SCOPED_TIMER(_hash_table_emplace_timer); - if constexpr (vectorized::ColumnsHashing::IsSingleNullableColumnMethod< - AggState>::value) { - for (size_t i = 0; i < num_rows; ++i) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { - agg_method.data.prefetch_by_hash( - _hash_values[i + HASH_MAP_PREFETCH_DIST]); - } - - places[i] = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, - _hash_values[i], creator, - creator_for_null_key); - } - } else { - state.lazy_emplace_keys(agg_method.data, keys, _hash_values, creator, - places); - } - } else { - SCOPED_TIMER(_hash_table_emplace_timer); - for (size_t i = 0; i < num_rows; ++i) { - vectorized::AggregateDataPtr mapped = nullptr; - if constexpr (vectorized::ColumnsHashing::IsSingleNullableColumnMethod< - AggState>::value) { - mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, - creator, creator_for_null_key); - } else { - mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, - creator); - } - places[i] = mapped; - } + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + places[i] = agg_method.lazy_emplace(state, i, creator, creator_for_null_key); } COUNTER_UPDATE(_hash_table_input_counter, num_rows); @@ -598,35 +544,14 @@ void AggSinkLocalState::_find_in_hash_table( std::visit( [&](auto&& agg_method) -> void { using HashMethodType = std::decay_t; - using HashTableType = std::decay_t; using AggState = typename HashMethodType::State; - AggState state(key_columns, Base::_shared_state->probe_key_sz, nullptr); - - _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); - const auto& keys = state.get_keys(); - if constexpr (HashTableTraits::is_phmap) { - _hash_values.resize(num_rows); - - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(keys[i]); - } - } + AggState state(key_columns, Base::_shared_state->probe_key_sz); + agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz, + num_rows); /// For all rows. for (size_t i = 0; i < num_rows; ++i) { - auto find_result = [&]() { - if constexpr (HashTableTraits::is_phmap) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { - agg_method.data.prefetch_by_hash( - _hash_values[i + HASH_MAP_PREFETCH_DIST]); - } - - return state.find_key_with_hash(agg_method.data, _hash_values[i], - keys[i]); - } else { - return state.find_key(agg_method.data, i, *_agg_arena_pool); - } - }(); + auto find_result = agg_method.find(state, i); if (find_result.is_found()) { places[i] = find_result.get_mapped(); @@ -759,7 +684,7 @@ Status AggSinkLocalState::try_spill_disk(bool eos) { } return std::visit( [&](auto&& agg_method) -> Status { - auto& hash_table = agg_method.data; + auto& hash_table = *agg_method.hash_table; if (!eos && _memory_usage() < Base::_parent->template cast() ._external_agg_bytes_threshold) { @@ -935,8 +860,6 @@ Status AggSinkLocalState::close(RuntimeState* state, St std::vector tmp_deserialize_buffer; _deserialize_buffer.swap(tmp_deserialize_buffer); - std::vector tmp_hash_values; - _hash_values.swap(tmp_hash_values); return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 7b2b684bde3abf..de73b12874238c 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -77,22 +77,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState { Status _execute_with_serialized_key_helper(vectorized::Block* block); void _find_in_hash_table(vectorized::AggregateDataPtr* places, vectorized::ColumnRawPtrs& key_columns, size_t num_rows); - - template - void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method, - const vectorized::ColumnRawPtrs& key_columns, - const size_t num_rows) { - if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - auto old_keys_memory = agg_method.keys_memory_usage; - SCOPED_TIMER(_serialize_key_timer); - int64_t row_size = (int64_t)(agg_method.serialize_keys(key_columns, num_rows)); - COUNTER_SET(_max_row_size_counter, std::max(_max_row_size_counter->value(), row_size)); - state.set_serialized_keys(agg_method.keys.data()); - - _serialize_key_arena_memory_usage->add(agg_method.keys_memory_usage - old_keys_memory); - } - } void _emplace_into_hash_table(vectorized::AggregateDataPtr* places, vectorized::ColumnRawPtrs& key_columns, const size_t num_rows); size_t _get_hash_table_size(); @@ -123,7 +107,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState { ->create_serialize_column(); } - context.init_once(); + context.init_iterator(); const auto size = hash_table.size(); std::vector keys(size); if (Base::_shared_state->values.size() < size) { @@ -156,7 +140,8 @@ class AggSinkLocalState : public PipelineXSinkLocalState { key_columns[0]->insert_data(nullptr, 0); // Here is no need to set `keys[num_rows]`, keep it as default value. - Base::_shared_state->values[num_rows] = hash_table.get_null_key_data(); + Base::_shared_state->values[num_rows] = + hash_table.template get_null_key_data(); ++num_rows; } @@ -304,7 +289,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState { vectorized::AggregatedDataVariants* _agg_data; vectorized::Arena* _agg_arena_pool; - std::vector _hash_values; using vectorized_execute = std::function; using vectorized_update_memusage = std::function; @@ -325,7 +309,7 @@ class BlockingAggSinkLocalState BlockingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : AggSinkLocalState(parent, state) {} - ~BlockingAggSinkLocalState() = default; + ~BlockingAggSinkLocalState() override = default; }; template diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index c5d7987ab8a139..fd192e04ab3f1d 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -19,6 +19,7 @@ #include +#include "common/exception.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/streaming_aggregation_source_operator.h" #include "vec//utils/util.hpp" @@ -81,7 +82,7 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { void AggLocalState::_close_with_serialized_key() { std::visit( [&](auto&& agg_method) -> void { - auto& data = agg_method.data; + auto& data = *agg_method.hash_table; data.for_each_mapped([&](auto& mapped) { if (mapped) { static_cast(_dependency->destroy_agg_status(mapped)); @@ -89,7 +90,11 @@ void AggLocalState::_close_with_serialized_key() { } }); if (data.has_null_key_data()) { - static_cast(_dependency->destroy_agg_status(data.get_null_key_data())); + auto st = _dependency->destroy_agg_status( + data.template get_null_key_data()); + if (!st) { + throw Exception(st.code(), st.to_string()); + } } }, _agg_data->method_variant); @@ -170,8 +175,8 @@ Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta SCOPED_TIMER(_get_results_timer); std::visit( [&](auto&& agg_method) -> void { - agg_method.init_once(); - auto& data = agg_method.data; + agg_method.init_iterator(); + auto& data = *agg_method.hash_table; const auto size = std::min(data.size(), size_t(state->batch_size())); using KeyType = std::decay_tget_first())>; std::vector keys(size); @@ -201,14 +206,16 @@ Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta } if (iter == _shared_state->aggregate_data_container->end()) { - if (agg_method.data.has_null_key_data()) { + if (agg_method.hash_table->has_null_key_data()) { // only one key of group by support wrap null key // here need additional processing logic on the null key / value DCHECK(key_columns.size() == 1); DCHECK(key_columns[0]->is_nullable()); - if (agg_method.data.has_null_key_data()) { + if (agg_method.hash_table->has_null_key_data()) { key_columns[0]->insert_data(nullptr, 0); - _shared_state->values[num_rows] = agg_method.data.get_null_key_data(); + _shared_state->values[num_rows] = + agg_method.hash_table->template get_null_key_data< + vectorized::AggregateDataPtr>(); ++num_rows; source_state = SourceState::FINISHED; } @@ -325,8 +332,8 @@ Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st SCOPED_TIMER(_get_results_timer); std::visit( [&](auto&& agg_method) -> void { - auto& data = agg_method.data; - agg_method.init_once(); + auto& data = *agg_method.hash_table; + agg_method.init_iterator(); const auto size = std::min(data.size(), size_t(state->batch_size())); using KeyType = std::decay_tget_first())>; std::vector keys(size); @@ -362,14 +369,15 @@ Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st } if (iter == _shared_state->aggregate_data_container->end()) { - if (agg_method.data.has_null_key_data()) { + if (agg_method.hash_table->has_null_key_data()) { // only one key of group by support wrap null key // here need additional processing logic on the null key / value DCHECK(key_columns.size() == 1); DCHECK(key_columns[0]->is_nullable()); if (key_columns[0]->size() < state->batch_size()) { key_columns[0]->insert_data(nullptr, 0); - auto mapped = agg_method.data.get_null_key_data(); + auto mapped = agg_method.hash_table->template get_null_key_data< + vectorized::AggregateDataPtr>(); for (size_t i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) _shared_state->aggregate_evaluators[i]->insert_result_info( mapped + _dependency->offsets_of_aggregate_states()[i], @@ -538,7 +546,7 @@ Status AggLocalState::close(RuntimeState* state) { if (_hash_table_size_counter) { std::visit( [&](auto&& agg_method) { - COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.data.size())); + COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.hash_table->size())); }, _agg_data->method_variant); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index 9f0fafeb54feb7..4651da4ecd09c5 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -155,43 +155,26 @@ void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct( [&](auto&& agg_method) -> void { SCOPED_TIMER(_hash_table_compute_timer); using HashMethodType = std::decay_t; - using HashTableType = std::decay_t; using AggState = typename HashMethodType::State; - AggState state(key_columns, _shared_state->probe_key_sz, nullptr); - _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); - - if constexpr (HashTableTraits::is_phmap) { - const auto& keys = state.get_keys(); - if (_hash_values.size() < num_rows) { - _hash_values.resize(num_rows); - } - - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(keys[i]); - } - SCOPED_TIMER(_hash_table_emplace_timer); - for (size_t i = 0; i < num_rows; ++i) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { - agg_method.data.prefetch_by_hash( - _hash_values[i + HASH_MAP_PREFETCH_DIST]); - } - auto result = state.emplace_with_key( - agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool), - _hash_values[i], i); - if (result.is_inserted()) { - distinct_row.push_back(i); - } - } - } else { - SCOPED_TIMER(_hash_table_emplace_timer); - for (size_t i = 0; i < num_rows; ++i) { - auto result = state.emplace_key(agg_method.data, i, *_agg_arena_pool); - if (result.is_inserted()) { - result.set_mapped(dummy_mapped_data.get()); - distinct_row.push_back(i); - } - } + AggState state(key_columns, _shared_state->probe_key_sz); + agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz, + num_rows); + size_t row = 0; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, _arena); + ctor(key, dummy_mapped_data.get()); + distinct_row.push_back(row); + }; + auto creator_for_null_key = [&](auto& mapped) { + mapped = dummy_mapped_data.get(); + distinct_row.push_back(row); + }; + + SCOPED_TIMER(_hash_table_emplace_timer); + for (; row < num_rows; ++row) { + agg_method.lazy_emplace(state, row, creator, creator_for_null_key); } + COUNTER_UPDATE(_hash_table_input_counter, num_rows); }, _agg_data->method_variant); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index 0c909b413b74e4..f0e938a745d747 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -103,6 +103,7 @@ class DistinctStreamingAggSinkLocalState final std::unique_ptr _output_block = vectorized::Block::create_unique(); std::shared_ptr dummy_mapped_data = nullptr; vectorized::IColumn::Selector _distinct_row; + vectorized::Arena _arena; int64_t _output_distinct_rows = 0; }; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 99aeaf0a1c8462..c9fe153af08f9f 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -22,7 +22,6 @@ #include "exprs/bloom_filter_func.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/operator.h" -#include "vec/common/aggregation_common.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/utils/template_helpers.hpp" @@ -381,7 +380,7 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { __builtin_unreachable(); }, [&](auto&& arg) { - arg.hash_table.set_partitioned_threshold( + arg.hash_table->set_partitioned_threshold( state->partitioned_hash_join_rows_threshold()); }}, *_shared_state->hash_table_variants); @@ -567,9 +566,18 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); local_state._shared_state->_has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side; - local_state._shared_state->hash_table_variants = - std::static_pointer_cast( - _shared_hash_table_context->hash_table_variants); + std::visit( + [](auto&& dst, auto&& src) { + if constexpr (!std::is_same_v> && + std::is_same_v, + std::decay_t>) { + dst.hash_table = src.hash_table; + } + }, + *local_state._shared_state->hash_table_variants, + *std::static_pointer_cast( + _shared_hash_table_context->hash_table_variants)); + local_state._shared_state->build_blocks = _shared_hash_table_context->blocks; if (!_shared_hash_table_context->runtime_filters.empty()) { @@ -588,7 +596,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _build_expr_ctxs, _runtime_filter_descs); RETURN_IF_ERROR(local_state._runtime_filter_slots->init( - state, arg.hash_table.size(), 0)); + state, arg.hash_table->size(), 0)); RETURN_IF_ERROR( local_state._runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index fcdbb685358e74..907114829ee01e 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -178,52 +178,31 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table( [&](auto&& agg_method) -> void { SCOPED_TIMER(local_state._build_timer); using HashMethodType = std::decay_t; - using HashTableType = std::decay_t; using AggState = typename HashMethodType::State; - AggState state(key_columns, local_state._partition_key_sz, nullptr); + AggState state(key_columns, local_state._partition_key_sz); size_t num_rows = input_block->rows(); - _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); + agg_method.init_serialized_keys(key_columns, local_state._partition_key_sz, + num_rows); - //PHHashMap - const auto& keys = state.get_keys(); - if constexpr (HashTableTraits::is_phmap) { - local_state._hash_values.resize(num_rows); - - for (size_t i = 0; i < num_rows; ++i) { - local_state._hash_values[i] = agg_method.data.hash(keys[i]); - } - } + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, *local_state._agg_arena_pool); + auto aggregate_data = _pool->add(new vectorized::PartitionBlocks()); + local_state._value_places.push_back(aggregate_data); + ctor(key, aggregate_data); + local_state._num_partition++; + }; + auto creator_for_null_key = [&](auto& mapped) { + mapped = _pool->add(new vectorized::PartitionBlocks()); + local_state._value_places.push_back(mapped); + local_state._num_partition++; + }; + SCOPED_TIMER(local_state._emplace_key_timer); for (size_t row = 0; row < num_rows; ++row) { - SCOPED_TIMER(local_state._emplace_key_timer); - vectorized::PartitionDataPtr aggregate_data = nullptr; - auto emplace_result = [&]() { - if constexpr (HashTableTraits::is_phmap) { - if (LIKELY(row + HASH_MAP_PREFETCH_DIST < num_rows)) { - agg_method.data.prefetch_by_hash( - local_state._hash_values[row + HASH_MAP_PREFETCH_DIST]); - } - return state.emplace_with_key(agg_method.data, keys[row], - local_state._hash_values[row], row); - } else { - return state.emplace_with_key(agg_method.data, keys[row], row); - } - }(); - - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.is_inserted()) { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.set_mapped(nullptr); - aggregate_data = _pool->add(new vectorized::PartitionBlocks()); - emplace_result.set_mapped(aggregate_data); - local_state._value_places.push_back(aggregate_data); - local_state._num_partition++; - } else { - aggregate_data = emplace_result.get_mapped(); - } - assert(aggregate_data != nullptr); - aggregate_data->add_row_idx(row); + auto& mapped = + agg_method.lazy_emplace(state, row, creator, creator_for_null_key); + mapped->add_row_idx(row); } for (auto place : local_state._value_places) { SCOPED_TIMER(local_state._selector_block_timer); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 6c124bf3b19cc8..59517642bf4f2a 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -70,7 +70,6 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState _value_places; int _num_partition = 0; std::vector _partition_columns; - std::vector _hash_values; std::unique_ptr _partitioned_data; std::unique_ptr _agg_arena_pool; std::vector _partition_key_sz; @@ -125,16 +124,6 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX - void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method, - const vectorized::ColumnRawPtrs& key_columns, - const size_t num_rows) { - if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - (agg_method.serialize_keys(key_columns, num_rows)); - state.set_serialized_keys(agg_method.keys.data()); - } - } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 06bb680ae3d877..055709da3a208d 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -222,7 +222,7 @@ void SetProbeSinkOperatorX::_finalize_probe( [&](auto&& arg) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - valid_element_in_hash_tbl = arg.hash_table.size(); + valid_element_in_hash_tbl = arg.hash_table->size(); } }, *hash_table_variants); @@ -248,15 +248,15 @@ void SetProbeSinkOperatorX::_refresh_hash_table( vectorized::RowRefListWithFlags>) { HashTableCtxType tmp_hash_table; bool is_need_shrink = - arg.hash_table.should_be_shrink(valid_element_in_hash_tbl); + arg.hash_table->should_be_shrink(valid_element_in_hash_tbl); if (is_intersect || is_need_shrink) { - tmp_hash_table.hash_table.init_buf_size( - valid_element_in_hash_tbl / arg.hash_table.get_factor() + 1); + tmp_hash_table.hash_table->init_buf_size( + valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1); } - arg.init_once(); - auto& iter = arg.iter; - auto iter_end = arg.hash_table.end(); + arg.init_iterator(); + auto& iter = arg.iterator; + auto iter_end = arg.hash_table->end(); std::visit( [&](auto is_need_shrink_const) { while (iter != iter_end) { @@ -266,13 +266,14 @@ void SetProbeSinkOperatorX::_refresh_hash_table( if constexpr (is_intersect) { //intersected if (it->visited) { it->visited = false; - tmp_hash_table.hash_table.insert(iter->get_value()); + tmp_hash_table.hash_table->insert( + iter->get_value()); } ++iter; } else { //except if constexpr (is_need_shrink_const) { if (!it->visited) { - tmp_hash_table.hash_table.insert( + tmp_hash_table.hash_table->insert( iter->get_value()); } } @@ -282,7 +283,7 @@ void SetProbeSinkOperatorX::_refresh_hash_table( }, vectorized::make_bool_variant(is_need_shrink)); - arg.inited = false; + arg.reset(); if (is_intersect || is_need_shrink) { arg.hash_table = std::move(tmp_hash_table.hash_table); } diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 0c3fcbb5396b03..604729a4700b05 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -85,7 +85,7 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo [&](auto&& arg) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - valid_element_in_hash_tbl = arg.hash_table.size(); + valid_element_in_hash_tbl = arg.hash_table->size(); } }, *local_state._shared_state->hash_table_variants); diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index ff5c713f38dad7..945ec06891c0af 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -83,6 +83,7 @@ class SetSinkLocalState final : public PipelineXSinkLocalState { vectorized::MutableBlock _mutable_block; // every child has its result expr list vectorized::VExprContextSPtrs _child_exprs; + vectorized::Arena _arena; }; template diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index ef18f0e8a4ac96..58cf33c90b5927 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -118,14 +118,14 @@ template Status SetSourceOperatorX::_get_data_in_hashtable( SetSourceLocalState& local_state, HashTableContext& hash_table_ctx, vectorized::Block* output_block, const int batch_size, SourceState& source_state) { - hash_table_ctx.init_once(); int left_col_len = local_state._left_table_data_types.size(); - auto& iter = hash_table_ctx.iter; + hash_table_ctx.init_iterator(); + auto& iter = hash_table_ctx.iterator; auto block_size = 0; if constexpr (std::is_same_v) { - for (; iter != hash_table_ctx.hash_table.end() && block_size < batch_size; ++iter) { + for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { auto& value = iter->get_second(); auto it = value.begin(); if constexpr (is_intersect) { @@ -142,7 +142,7 @@ Status SetSourceOperatorX::_get_data_in_hashtable( return Status::InternalError("Invalid RowRefListType!"); } - if (iter == hash_table_ctx.hash_table.end()) { + if (iter == hash_table_ctx.hash_table->end()) { source_state = SourceState::FINISHED; } if (!output_block->mem_reuse()) { diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 154d0144f3436d..9b11ef361e880c 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -170,7 +170,7 @@ bool StreamingAggSinkLocalState::_should_expand_preagg_hash_tables() { return std::visit( [&](auto&& agg_method) -> bool { - auto& hash_tbl = agg_method.data; + auto& hash_tbl = *agg_method.hash_table; auto [ht_mem, ht_rows] = std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; @@ -254,7 +254,8 @@ Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key( bool ret_flag = false; RETURN_IF_ERROR(std::visit( [&](auto&& agg_method) -> Status { - if (auto& hash_tbl = agg_method.data; hash_tbl.add_elem_size_overflow(rows)) { + if (auto& hash_tbl = *agg_method.hash_table; + hash_tbl.add_elem_size_overflow(rows)) { /// If too much memory is used during the pre-aggregation stage, /// it is better to output the data directly without performing further aggregation. const bool used_too_much_memory = diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 743fc12ee1588b..d56679f32a24b8 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -68,14 +68,10 @@ std::string OrDependency::debug_string(int indentation_level) { Status AggDependency::reset_hash_table() { return std::visit( [&](auto&& agg_method) { - auto& hash_table = agg_method.data; - using HashMethodType = std::decay_t; + auto& hash_table = *agg_method.hash_table; using HashTableType = std::decay_t; - if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - HashMethodType>::value) { - agg_method.reset(); - } + agg_method.reset(); hash_table.for_each_mapped([&](auto& mapped) { if (mapped) { @@ -125,7 +121,7 @@ Status AggDependency::merge_spilt_data() { CHECK_LT(_agg_state.spill_context.read_cursor, reader->block_count()); reader->seek(_agg_state.spill_context.read_cursor); vectorized::Block block; - bool eos; + bool eos = false; RETURN_IF_ERROR(reader->read(&block, &eos)); // TODO diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h b/be/src/vec/aggregate_functions/aggregate_function_collect.h index 78aa01ac68268b..0a2e9c443f10cb 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_collect.h +++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h @@ -27,7 +27,6 @@ #include #include "vec/aggregate_functions/aggregate_function.h" -#include "vec/aggregate_functions/key_holder_helpers.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" #include "vec/columns/column_decimal.h" @@ -36,7 +35,6 @@ #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" #include "vec/common/hash_table/hash_set.h" -#include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/common/pod_array_fwd.h" #include "vec/common/string_buffer.hpp" #include "vec/common/string_ref.h" @@ -115,7 +113,7 @@ struct AggregateFunctionCollectSetData { using ElementType = StringRef; using ColVecType = ColumnString; using SelfType = AggregateFunctionCollectSetData; - using Set = HashSetWithSavedHashWithStackMemory, 4>; + using Set = HashSetWithStackMemory, 4>; Set data_set; Int64 max_size = -1; @@ -124,8 +122,9 @@ struct AggregateFunctionCollectSetData { void add(const IColumn& column, size_t row_num, Arena* arena) { Set::LookupResult it; bool inserted; - auto key_holder = get_key_holder(column, row_num, *arena); - data_set.emplace(key_holder, it, inserted); + auto key = column.get_data_at(row_num); + key.data = arena->insert(key.data, key.size); + data_set.emplace(key, it, inserted); } void merge(const SelfType& rhs, Arena* arena) { @@ -141,7 +140,9 @@ struct AggregateFunctionCollectSetData { } } assert(arena != nullptr); - data_set.emplace(ArenaKeyHolder {rhs_elem.get_value(), *arena}, it, inserted); + StringRef key = rhs_elem.get_value(); + key.data = arena->insert(key.data, key.size); + data_set.emplace(key, it, inserted); } } diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_distinct.h index 769b4ff80522bf..3b4968050aeb7a 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h +++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h @@ -31,11 +31,9 @@ #include #include "vec/aggregate_functions/aggregate_function.h" -#include "vec/aggregate_functions/key_holder_helpers.h" #include "vec/columns/column.h" #include "vec/common/assert_cast.h" #include "vec/common/hash_table/hash_set.h" -#include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/common/string_ref.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" @@ -87,7 +85,7 @@ struct AggregateFunctionDistinctSingleNumericData { struct AggregateFunctionDistinctGenericData { /// When creating, the hash table must be small. - using Set = HashSetWithSavedHashWithStackMemory; + using Set = HashSetWithStackMemory; using Self = AggregateFunctionDistinctGenericData; Set set; @@ -95,7 +93,9 @@ struct AggregateFunctionDistinctGenericData { Set::LookupResult it; bool inserted; for (const auto& elem : rhs.set) { - set.emplace(ArenaKeyHolder {elem.get_value(), *arena}, it, inserted); + StringRef key = elem.get_value(); + key.data = arena->insert(key.data, key.size); + set.emplace(key, it, inserted); } } @@ -123,15 +123,16 @@ struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDist void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena* arena) { Set::LookupResult it; bool inserted; - auto key_holder = get_key_holder(*columns[0], row_num, *arena); - set.emplace(key_holder, it, inserted); + auto key = columns[0]->get_data_at(row_num); + key.data = arena->insert(key.data, key.size); + set.emplace(key, it, inserted); } MutableColumns get_arguments(const DataTypes& argument_types) const { MutableColumns argument_columns; argument_columns.emplace_back(argument_types[0]->create_column()); for (const auto& elem : set) { - deserialize_and_insert(elem.get_value(), *argument_columns[0]); + argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size); } return argument_columns; @@ -150,8 +151,8 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi Set::LookupResult it; bool inserted; - auto key_holder = SerializedKeyHolder {value, *arena}; - set.emplace(key_holder, it, inserted); + value.data = arena->insert(value.data, value.size); + set.emplace(value, it, inserted); } MutableColumns get_arguments(const DataTypes& argument_types) const { diff --git a/be/src/vec/aggregate_functions/aggregate_function_map.h b/be/src/vec/aggregate_functions/aggregate_function_map.h index f22a1e7b50beab..a1378ba07da70b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_map.h +++ b/be/src/vec/aggregate_functions/aggregate_function_map.h @@ -27,7 +27,6 @@ #include "vec/columns/column_string.h" #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" -#include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/common/string_ref.h" #include "vec/core/types.h" #include "vec/data_types/data_type_factory.hpp" @@ -65,13 +64,10 @@ struct AggregateFunctionMapAggData { return; } - ArenaKeyHolder key_holder {key, _arena}; - if (key.size > 0) { - key_holder_persist_key(key_holder); - } + _arena.insert(key.data, key.size); - _map.emplace(key_holder.key, _key_column->size()); - _key_column->insert_data(key_holder.key.data, key_holder.key.size); + _map.emplace(key, _key_column->size()); + _key_column->insert_data(key.data, key.size); _value_column->insert(value); } @@ -99,13 +95,10 @@ struct AggregateFunctionMapAggData { return; } - ArenaKeyHolder key_holder {key, _arena}; - if (key.size > 0) { - key_holder_persist_key(key_holder); - } + _arena.insert(key.data, key.size); - _map.emplace(key_holder.key, _key_column->size()); - _key_column->insert_data(key_holder.key.data, key_holder.key.size); + _map.emplace(key, _key_column->size()); + _key_column->insert_data(key.data, key.size); _value_column->insert(value_array[i]); } } @@ -123,14 +116,10 @@ struct AggregateFunctionMapAggData { if (_map.find(key) != _map.cend()) { continue; } - ArenaKeyHolder key_holder {key, _arena}; - if (key.size > 0) { - key_holder_persist_key(key_holder); - } + _arena.insert(key.data, key.size); - _map.emplace(key_holder.key, _key_column->size()); - static_cast(*_key_column) - .insert_data(key_holder.key.data, key_holder.key.size); + _map.emplace(key, _key_column->size()); + static_cast(*_key_column).insert_data(key.data, key.size); auto value = other._value_column->get_data_at(i); _value_column->insert_data(value.data, value.size); diff --git a/be/src/vec/aggregate_functions/key_holder_helpers.h b/be/src/vec/aggregate_functions/key_holder_helpers.h deleted file mode 100644 index e31ead7029f327..00000000000000 --- a/be/src/vec/aggregate_functions/key_holder_helpers.h +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/KeyHolderHelpers.h -// and modified by Doris - -#pragma once - -#include "vec/columns/column.h" -#include "vec/common/hash_table/hash_table_key_holder.h" - -namespace doris::vectorized { - -template -auto get_key_holder(const IColumn& column, size_t row_num, Arena& arena) { - if constexpr (is_plain_column) { - return ArenaKeyHolder {column.get_data_at(row_num), arena}; - } else { - const char* begin = nullptr; - StringRef serialized = column.serialize_value_into_arena(row_num, arena, begin); - assert(serialized.data != nullptr); - return SerializedKeyHolder {serialized, arena}; - } -} - -template -void deserialize_and_insert(StringRef str, IColumn& data_to) { - if constexpr (is_plain_column) { - data_to.insert_data(str.data, str.size); - } else { - data_to.deserialize_and_insert_from_arena(str.data); - } -} - -} // namespace doris::vectorized diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 1f7b4271a5f283..45ca44ee05787f 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -338,8 +338,7 @@ class IColumn : public COW { } virtual void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const { + const uint8_t* null_map) const { LOG(FATAL) << "serialize_vec_with_null_map not supported"; } diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index eb647f8a045f02..016a18f216f9f2 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -167,9 +167,8 @@ class ColumnConst final : public COWHelper { } void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const override { - data->serialize_vec_with_null_map(keys, num_rows, null_map, max_row_byte_size); + const uint8_t* null_map) const override { + data->serialize_vec_with_null_map(keys, num_rows, null_map); } void update_hash_with_value(size_t, SipHash& hash) const override { diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index ac48bc1cc601c0..edc8a5777fff86 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -86,8 +86,7 @@ void ColumnDecimal::serialize_vec(std::vector& keys, size_t num_ro template void ColumnDecimal::serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const { + const uint8_t* null_map) const { for (size_t i = 0; i < num_rows; ++i) { if (null_map[i] == 0) { memcpy(const_cast(keys[i].data + keys[i].size), &data[i], sizeof(T)); diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index ca38303ae288dc..e86a12dc3cb194 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -159,14 +159,13 @@ class ColumnDecimal final : public COWHelper& keys, size_t num_rows, - size_t max_row_byte_size) const override; + void serialize_vec(std::vector& keys, size_t num_rows, + size_t max_row_byte_size) const override; - virtual void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const override; + void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + const uint8_t* null_map) const override; void deserialize_vec(std::vector& keys, const size_t num_rows) override; diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 8a3ff18a935f7c..b3386a4fe6df99 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -262,7 +262,7 @@ void ColumnNullable::serialize_vec(std::vector& keys, size_t num_rows keys[i].size += s; } - get_nested_column().serialize_vec_with_null_map(keys, num_rows, arr.data(), max_row_byte_size); + get_nested_column().serialize_vec_with_null_map(keys, num_rows, arr.data()); } void ColumnNullable::deserialize_vec(std::vector& keys, const size_t num_rows) { diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index cced90f90350f3..580790916b6fe9 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -310,8 +310,7 @@ void ColumnString::serialize_vec(std::vector& keys, size_t num_rows, } void ColumnString::serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const { + const uint8_t* null_map) const { for (size_t i = 0; i < num_rows; ++i) { if (null_map[i] == 0) { uint32_t offset(offset_at(i)); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 2701a39e898375..99dc9fa8714165 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -389,8 +389,7 @@ class ColumnString final : public COWHelper { size_t max_row_byte_size) const override; void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const override; + const uint8_t* null_map) const override; void deserialize_vec_with_null_map(std::vector& keys, const size_t num_rows, const uint8_t* null_map) override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 6da7a549bfe401..d61b4a831ae1b0 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -34,6 +34,7 @@ #include "vec/common/arena.h" #include "vec/common/assert_cast.h" #include "vec/common/bit_cast.h" +#include "vec/common/memcpy_small.h" #include "vec/common/nan_utils.h" #include "vec/common/radix_sort.h" #include "vec/common/sip_hash.h" @@ -66,18 +67,17 @@ template void ColumnVector::serialize_vec(std::vector& keys, size_t num_rows, size_t max_row_byte_size) const { for (size_t i = 0; i < num_rows; ++i) { - memcpy(const_cast(keys[i].data + keys[i].size), &data[i], sizeof(T)); + memcpy_fixed(const_cast(keys[i].data + keys[i].size), (char*)&data[i]); keys[i].size += sizeof(T); } } template void ColumnVector::serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const { + const uint8_t* null_map) const { for (size_t i = 0; i < num_rows; ++i) { if (null_map[i] == 0) { - memcpy(const_cast(keys[i].data + keys[i].size), &data[i], sizeof(T)); + memcpy_fixed(const_cast(keys[i].data + keys[i].size), (char*)&data[i]); keys[i].size += sizeof(T); } } diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index ec1369610223ea..0b4192626aeeee 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -269,8 +269,7 @@ class ColumnVector final : public COWHelper> size_t max_row_byte_size) const override; void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, - const uint8_t* null_map, - size_t max_row_byte_size) const override; + const uint8_t* null_map) const override; void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const override { diff --git a/be/src/vec/common/aggregation_common.h b/be/src/vec/common/aggregation_common.h deleted file mode 100644 index 022823e6aaf994..00000000000000 --- a/be/src/vec/common/aggregation_common.h +++ /dev/null @@ -1,148 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/AggregationCommon.h -// and modified by Doris - -#pragma once - -#include - -#include "vec/columns/column.h" -#include "vec/columns/columns_number.h" -#include "vec/common/arena.h" -#include "vec/common/assert_cast.h" -#include "vec/common/hash_table/hash.h" -#include "vec/common/memcpy_small.h" -#include "vec/common/sip_hash.h" -#include "vec/common/string_ref.h" -#include "vec/common/uint128.h" - -namespace doris::vectorized { - -inline size_t get_bitmap_size(size_t key_number) { - return (key_number + 7) / 8; -} - -using Sizes = std::vector; - -template -std::vector pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns, - const Sizes& key_sizes, const ColumnRawPtrs& nullmap_columns) { - size_t bitmap_size = get_bitmap_size(nullmap_columns.size()); - - std::vector result(row_numbers); - size_t offset = 0; - if (bitmap_size > 0) { - for (size_t j = 0; j < nullmap_columns.size(); j++) { - if (!nullmap_columns[j]) { - continue; - } - size_t bucket = j / 8; - size_t offset = j % 8; - const auto& data = - assert_cast(*nullmap_columns[j]).get_data().data(); - for (size_t i = 0; i < row_numbers; ++i) { - *((char*)(&result[i]) + bucket) |= data[i] << offset; - } - } - offset += bitmap_size; - } - - for (size_t j = 0; j < key_columns.size(); ++j) { - const char* data = key_columns[j]->get_raw_data().data; - - auto foo = [&](Fixed zero) { - CHECK_EQ(sizeof(Fixed), key_sizes[j]); - if (nullmap_columns.size() && nullmap_columns[j]) { - const auto& nullmap = - assert_cast(*nullmap_columns[j]).get_data().data(); - for (size_t i = 0; i < row_numbers; ++i) { - // make sure null cell is filled by 0x0 - memcpy_fixed((char*)(&result[i]) + offset, - nullmap[i] ? (char*)&zero : data + i * sizeof(Fixed)); - } - } else { - for (size_t i = 0; i < row_numbers; ++i) { - memcpy_fixed((char*)(&result[i]) + offset, data + i * sizeof(Fixed)); - } - } - }; - - if (key_sizes[j] == 1) { - foo(int8_t()); - } else if (key_sizes[j] == 2) { - foo(int16_t()); - } else if (key_sizes[j] == 4) { - foo(int32_t()); - } else if (key_sizes[j] == 8) { - foo(int64_t()); - } else if (key_sizes[j] == 16) { - foo(UInt128()); - } else { - throw Exception(ErrorCode::INTERNAL_ERROR, - "pack_fixeds input invalid key size, key_size={}", key_sizes[j]); - } - offset += key_sizes[j]; - } - return result; -} - -/// Hash a set of keys into a UInt128 value. -inline UInt128 hash128(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns) { - UInt128 key; - SipHash hash; - - for (size_t j = 0; j < keys_size; ++j) { - key_columns[j]->update_hash_with_value(i, hash); - } - - hash.get128(key.low, key.high); - - return key; -} - -/// Copy keys to the pool. Then put into pool StringRefs to them and return the pointer to the first. -inline StringRef* place_keys_in_pool(size_t keys_size, StringRefs& keys, Arena& pool) { - for (size_t j = 0; j < keys_size; ++j) { - char* place = pool.alloc(keys[j].size); - memcpy_small_allow_read_write_overflow15(place, keys[j].data, keys[j].size); - keys[j].data = place; - } - - /// Place the StringRefs on the newly copied keys in the pool. - char* res = pool.aligned_alloc(keys_size * sizeof(StringRef), alignof(StringRef)); - memcpy_small_allow_read_write_overflow15(res, keys.data(), keys_size * sizeof(StringRef)); - - return reinterpret_cast(res); -} - -/** Serialize keys into a continuous chunk of memory. - */ -inline StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size, - const ColumnRawPtrs& key_columns, Arena& pool) { - const char* begin = nullptr; - - size_t sum_size = 0; - for (size_t j = 0; j < keys_size; ++j) { - sum_size += key_columns[j]->serialize_value_into_arena(i, pool, begin).size; - } - - return {begin, sum_size}; -} - -} // namespace doris::vectorized diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h index 21611e7d829544..824ac2a37cc0a1 100644 --- a/be/src/vec/common/columns_hashing.h +++ b/be/src/vec/common/columns_hashing.h @@ -29,92 +29,42 @@ #include "vec/common/assert_cast.h" #include "vec/common/columns_hashing_impl.h" #include "vec/common/hash_table/hash_table.h" -#include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/common/hash_table/ph_hash_map.h" #include "vec/common/string_ref.h" #include "vec/common/unaligned.h" namespace doris::vectorized { +using Sizes = std::vector; namespace ColumnsHashing { /// For the case when there is one numeric key. /// UInt8/16/32/64 for any type with corresponding bit width. -template -struct HashMethodOneNumber : public columns_hashing_impl::HashMethodBase< - HashMethodOneNumber, - Value, Mapped, use_cache> { - using Self = HashMethodOneNumber; - using Base = columns_hashing_impl::HashMethodBase; - - const char* vec; - size_t size; +template +struct HashMethodOneNumber + : public columns_hashing_impl::HashMethodBase, + Value, Mapped, false> { + using Self = HashMethodOneNumber; + using Base = columns_hashing_impl::HashMethodBase; /// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise. - HashMethodOneNumber(const ColumnRawPtrs& key_columns, const Sizes& /*key_sizes*/, - const HashMethodContextPtr&) { - vec = key_columns[0]->get_raw_data().data; - size = key_columns[0]->size(); - } - - HashMethodOneNumber(const IColumn* column) { vec = column->get_raw_data().data; } + HashMethodOneNumber(const ColumnRawPtrs& key_columns, const Sizes& /*key_sizes*/) {} - /// Creates context. Method is called once and result context is used in all threads. - using Base::createContext; /// (const HashMethodContext::Settings &) -> HashMethodContextPtr - - /// Emplace key into HashTable or HashMap. If Data is HashMap, returns ptr to value, otherwise nullptr. - /// Data is a HashTable where to insert key from column's row. - /// For Serialized method, key may be placed in pool. - using Base::emplace_key; /// (Data & data, size_t row, Arena & pool) -> EmplaceResult - - /// Find key into HashTable or HashMap. If Data is HashMap and key was found, returns ptr to value, otherwise nullptr. - using Base::find_key; /// (Data & data, size_t row, Arena & pool) -> FindResult using Base::find_key_with_hash; - - /// Is used for default implementation in HashMethodBase. - FieldType get_key_holder(size_t row, Arena&) const { return ((FieldType*)(vec))[row]; } - FieldType pack_key_holder(FieldType key, Arena&) const { return key; } - - std::span get_keys() const { return std::span((FieldType*)vec, size); } }; /// For the case when there is one string key. -template -struct HashMethodString : public columns_hashing_impl::HashMethodBase< - HashMethodString, - Value, Mapped, use_cache> { - using Self = HashMethodString; - using Base = columns_hashing_impl::HashMethodBase; - - const IColumn::Offset* offsets; - const UInt8* chars; - std::vector keys; - - HashMethodString(const ColumnRawPtrs& key_columns, const Sizes& /*key_sizes*/, - const HashMethodContextPtr&) { - const IColumn& column = *key_columns[0]; - const ColumnString& column_string = assert_cast(column); - offsets = column_string.get_offsets().data(); - chars = column_string.get_chars().data(); - - keys.resize(column_string.size()); - for (size_t row = 0; row < column_string.size(); row++) { - keys[row] = StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1]); - } - } - - auto get_key_holder(ssize_t row, [[maybe_unused]] Arena& pool) const { - if constexpr (place_string_to_arena) { - return ArenaKeyHolder {keys[row], pool}; - } else { - return keys[row]; - } - } +template +struct HashMethodString + : public columns_hashing_impl::HashMethodBase< + HashMethodString, Value, Mapped, false> { + using Self = HashMethodString; + using Base = columns_hashing_impl::HashMethodBase; - const std::vector& get_keys() const { return keys; } + HashMethodString(const ColumnRawPtrs& key_columns, const Sizes& /*key_sizes*/) {} protected: - friend class columns_hashing_impl::HashMethodBase; + friend class columns_hashing_impl::HashMethodBase; }; /** Hash by concatenating serialized key values. @@ -122,117 +72,41 @@ struct HashMethodString : public columns_hashing_impl::HashMethodBase< * That is, for example, for strings, it contains first the serialized length of the string, and then the bytes. * Therefore, when aggregating by several strings, there is no ambiguity. */ -template +template struct HashMethodSerialized - : public columns_hashing_impl::HashMethodBase< - HashMethodSerialized, Value, Mapped, false> { - using Self = HashMethodSerialized; + : public columns_hashing_impl::HashMethodBase, Value, + Mapped, false> { + using Self = HashMethodSerialized; using Base = columns_hashing_impl::HashMethodBase; - using KeyHolderType = - std::conditional_t; - - ColumnRawPtrs key_columns; - size_t keys_size; - StringRef* keys; - - HashMethodSerialized(const ColumnRawPtrs& key_columns_, const Sizes& /*key_sizes*/, - const HashMethodContextPtr&) - : key_columns(key_columns_), keys_size(key_columns_.size()) {} - - void set_serialized_keys(StringRef* keys_) { keys = keys_; } - - ALWAYS_INLINE KeyHolderType get_key_holder(size_t row, Arena& pool) const { - if constexpr (keys_pre_serialized) { - return KeyHolderType {keys[row], pool}; - } else { - return KeyHolderType { - serialize_keys_to_pool_contiguous(row, keys_size, key_columns, pool), pool}; - } - } - KeyHolderType pack_key_holder(StringRef key, Arena& pool) const { - return KeyHolderType {key, pool}; - } - - std::span get_keys() const { - return std::span(keys, key_columns[0]->size()); - } + HashMethodSerialized(const ColumnRawPtrs& key_columns_, const Sizes& /*key_sizes*/) {} protected: friend class columns_hashing_impl::HashMethodBase; }; -template -struct IsPreSerializedKeysHashMethodTraits { - constexpr static bool value = false; -}; - -template -struct IsPreSerializedKeysHashMethodTraits> { - constexpr static bool value = true; -}; - -/// For the case when there is one string key. -template -struct HashMethodHashed - : public columns_hashing_impl::HashMethodBase, - Value, Mapped, use_cache> { - using Key = UInt128; - using Self = HashMethodHashed; - using Base = columns_hashing_impl::HashMethodBase; - - ColumnRawPtrs key_columns; - - HashMethodHashed(ColumnRawPtrs key_columns_, const Sizes&, const HashMethodContextPtr&) - : key_columns(std::move(key_columns_)) {} - - ALWAYS_INLINE Key get_key_holder(size_t row, Arena&) const { - return hash128(row, key_columns.size(), key_columns); - } -}; - /// For the case when all keys are of fixed length, and they fit in N (for example, 128) bits. -template +template struct HashMethodKeysFixed - : private columns_hashing_impl::BaseStateKeysFixed, + : private columns_hashing_impl::BaseStateKeysFixed, public columns_hashing_impl::HashMethodBase< - HashMethodKeysFixed, Value, - Mapped, use_cache> { - using Self = HashMethodKeysFixed; - using BaseHashed = columns_hashing_impl::HashMethodBase; - using Base = columns_hashing_impl::BaseStateKeysFixed; - - const Sizes& key_sizes; - size_t keys_size; - std::vector keys; - - HashMethodKeysFixed(const ColumnRawPtrs& key_columns, const Sizes& key_sizes_, - const HashMethodContextPtr&) - : Base(key_columns), key_sizes(key_sizes_), keys_size(key_columns.size()) { - keys = pack_fixeds(key_columns[0]->size(), Base::get_actual_columns(), key_sizes, - Base::get_nullmap_columns()); - } - - ALWAYS_INLINE Key get_key_holder(size_t row, Arena&) const { return keys[row]; } - - Key pack_key_holder(Key key, Arena& pool) const { return key; } - - const std::vector& get_keys() const { return keys; } + HashMethodKeysFixed, Value, Mapped, + false> { + using Self = HashMethodKeysFixed; + using BaseHashed = columns_hashing_impl::HashMethodBase; + using Base = columns_hashing_impl::BaseStateKeysFixed; + + HashMethodKeysFixed(const ColumnRawPtrs& key_columns, const Sizes& key_sizes_) + : Base(key_columns) {} }; -template +template struct HashMethodSingleLowNullableColumn : public SingleColumnMethod { using Base = SingleColumnMethod; static constexpr bool has_mapped = !std::is_same::value; - using EmplaceResult = columns_hashing_impl::EmplaceResultImpl; using FindResult = columns_hashing_impl::FindResultImpl; - static HashMethodContextPtr createContext(const HashMethodContext::Settings& settings) { - return nullptr; - } - const ColumnNullable* key_column; static const ColumnRawPtrs get_nested_column(const IColumn* col) { @@ -243,158 +117,28 @@ struct HashMethodSingleLowNullableColumn : public SingleColumnMethod { } HashMethodSingleLowNullableColumn(const ColumnRawPtrs& key_columns_nullable, - const Sizes& key_sizes, const HashMethodContextPtr& context) - : Base(get_nested_column(key_columns_nullable[0]), key_sizes, context), + const Sizes& key_sizes) + : Base(get_nested_column(key_columns_nullable[0]), key_sizes), key_column(assert_cast(key_columns_nullable[0])) {} - template - ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t row, Arena& pool) { - if (key_column->is_null_at(row)) { - bool has_null_key = data.has_null_key_data(); - data.has_null_key_data() = true; - - if constexpr (has_mapped) - return EmplaceResult(data.get_null_key_data(), data.get_null_key_data(), - !has_null_key); - else - return EmplaceResult(!has_null_key); - } - - auto key_holder = Base::get_key_holder(row, pool); - - bool inserted = false; - typename Data::LookupResult it; - data.emplace(key_holder, it, inserted); - - if constexpr (has_mapped) { - auto& mapped = *lookup_result_get_mapped(it); - if (inserted) { - new (&mapped) Mapped(); - } - return EmplaceResult(mapped, mapped, inserted); - } else { - return EmplaceResult(inserted); - } - } - - template - ALWAYS_INLINE EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t row) { - if (key_column->is_null_at(row)) { - bool has_null_key = data.has_null_key_data(); - data.has_null_key_data() = true; - - if constexpr (has_mapped) { - return EmplaceResult(data.get_null_key_data(), data.get_null_key_data(), - !has_null_key); - } else { - return EmplaceResult(!has_null_key); - } - } - - bool inserted = false; - typename Data::LookupResult it; - data.emplace(key, it, inserted); - - if constexpr (has_mapped) { - auto& mapped = *lookup_result_get_mapped(it); - if (inserted) { - new (&mapped) Mapped(); - } - return EmplaceResult(mapped, mapped, inserted); - } else { - return EmplaceResult(inserted); - } - } - - template - EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t hash_value, size_t row) { - if (key_column->is_null_at(row)) { - bool has_null_key = data.has_null_key_data(); - data.has_null_key_data() = true; - - if constexpr (has_mapped) { - return EmplaceResult(data.get_null_key_data(), data.get_null_key_data(), - !has_null_key); - } else { - return EmplaceResult(!has_null_key); - } - } - - bool inserted = false; - typename Data::LookupResult it; - data.emplace(key, it, hash_value, inserted); - - if constexpr (has_mapped) { - auto& mapped = *lookup_result_get_mapped(it); - if (inserted) { - new (&mapped) Mapped(); - } - return EmplaceResult(mapped, mapped, inserted); - } else { - return EmplaceResult(inserted); - } - } - - template + template requires has_mapped - ALWAYS_INLINE Mapped& lazy_emplace_key(Data& data, size_t row, Arena& pool, Func&& f, + ALWAYS_INLINE Mapped& lazy_emplace_key(Data& data, size_t row, KeyHolder&& key, + size_t hash_value, Func&& f, CreatorForNull&& null_creator) { if (key_column->is_null_at(row)) { bool has_null_key = data.has_null_key_data(); data.has_null_key_data() = true; - if (!has_null_key) std::forward(null_creator)(data.get_null_key_data()); - return data.get_null_key_data(); - } - auto key_holder = Base::get_key_holder(row, pool); - typename Data::LookupResult it; - data.lazy_emplace(key_holder, it, std::forward(f)); - return *lookup_result_get_mapped(it); - } - - template - requires has_mapped - ALWAYS_INLINE Mapped& lazy_emplace_key(Data& data, size_t row, Arena& pool, size_t hash_value, - Func&& f, CreatorForNull&& null_creator) { - if (key_column->is_null_at(row)) { - bool has_null_key = data.has_null_key_data(); - data.has_null_key_data() = true; - if (!has_null_key) std::forward(null_creator)(data.get_null_key_data()); - return data.get_null_key_data(); + if (!has_null_key) { + std::forward(null_creator)( + data.template get_null_key_data()); + } + return data.template get_null_key_data(); } - auto key_holder = Base::get_key_holder(row, pool); typename Data::LookupResult it; - data.lazy_emplace(key_holder, it, hash_value, std::forward(f)); + data.lazy_emplace(std::forward(key), it, hash_value, std::forward(f)); return *lookup_result_get_mapped(it); } - - template - ALWAYS_INLINE FindResult find_key(Data& data, size_t row, Arena& pool) { - if (key_column->is_null_at(row)) { - bool has_null_key = data.has_null_key_data(); - if constexpr (has_mapped) - return FindResult(&data.get_null_key_data(), has_null_key); - else - return FindResult(has_null_key); - } - auto key_holder = Base::get_key_holder(row, pool); - auto key = key_holder_get_key(key_holder); - auto it = data.find(key); - if constexpr (has_mapped) - return FindResult(it ? lookup_result_get_mapped(it) : nullptr, it != nullptr); - else - return FindResult(it != nullptr); - } -}; - -template -struct IsSingleNullableColumnMethod { - static constexpr bool value = false; -}; - -template -struct IsSingleNullableColumnMethod< - HashMethodSingleLowNullableColumn> { - static constexpr bool value = true; }; } // namespace ColumnsHashing diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h index 2776e76d0d2002..a87f719d84764f 100644 --- a/be/src/vec/common/columns_hashing_impl.h +++ b/be/src/vec/common/columns_hashing_impl.h @@ -23,29 +23,13 @@ #include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" -#include "vec/common/aggregation_common.h" #include "vec/common/assert_cast.h" -#include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/common/hash_table/ph_hash_map.h" -// #include namespace doris::vectorized { namespace ColumnsHashing { -/// Generic context for HashMethod. Context is shared between multiple threads, all methods must be thread-safe. -/// Is used for caching. -class HashMethodContext { -public: - virtual ~HashMethodContext() = default; - - struct Settings { - size_t max_threads; - }; -}; - -using HashMethodContextPtr = std::shared_ptr; - namespace columns_hashing_impl { template @@ -68,34 +52,6 @@ struct LastElementCache { static constexpr bool consecutive_keys_optimization = false; }; -template -class EmplaceResultImpl { - Mapped& value; - Mapped& cached_value; - bool inserted; - -public: - EmplaceResultImpl(Mapped& value_, Mapped& cached_value_, bool inserted_) - : value(value_), cached_value(cached_value_), inserted(inserted_) {} - - bool is_inserted() const { return inserted; } - auto& get_mapped() const { return value; } - - void set_mapped(const Mapped& mapped) { - cached_value = mapped; - value = mapped; - } -}; - -template <> -class EmplaceResultImpl { - bool inserted; - -public: - explicit EmplaceResultImpl(bool inserted_) : inserted(inserted_) {} - bool is_inserted() const { return inserted; } -}; - template class FindResultImpl { Mapped* value; @@ -119,55 +75,17 @@ class FindResultImpl { template class HashMethodBase { public: - using EmplaceResult = EmplaceResultImpl; using FindResult = FindResultImpl; static constexpr bool has_mapped = !std::is_same_v; using Cache = LastElementCache; - static HashMethodContextPtr createContext(const HashMethodContext::Settings&) { - return nullptr; - } - - template - ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t row, Arena& pool) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - return emplaceImpl(key_holder, data); - } - template - EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t row) { - return emplaceImpl(key, data); - } - - template - EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t hash_value, size_t row) { - return emplaceImpl(key, hash_value, data); - } - - template - requires has_mapped - ALWAYS_INLINE Mapped& lazy_emplace_key(Data& data, size_t row, Arena& pool, Func&& f) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - return lazy_emplace_impl(key_holder, data, std::forward(f)); - } - - template + template requires has_mapped - ALWAYS_INLINE Mapped& lazy_emplace_key(Data& data, size_t hash_value, size_t row, Arena& pool, - Func&& f) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - return lazy_emplace_impl(key_holder, hash_value, data, std::forward(f)); - } - - template - void lazy_emplace_keys(Data& data, const Keys& keys, const std::vector& hash_values, - Func&& f, AggregateDataPtr* places) { - data.lazy_emplace_keys(std::span(keys), hash_values, places, std::forward(f)); - } - - template - ALWAYS_INLINE FindResult find_key(Data& data, size_t row, Arena& pool) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - return find_key_impl(key_holder_get_key(key_holder), data); + ALWAYS_INLINE Mapped& lazy_emplace_key(Data& data, size_t row, KeyHolder&& key, + size_t hash_value, Func&& f, + CreatorForNull&& null_creator) { + return lazy_emplace_impl(std::forward(key), hash_value, data, + std::forward(f)); } template @@ -175,15 +93,6 @@ class HashMethodBase { return find_key_impl(key, hash_value, data); } - template - ALWAYS_INLINE void prefetch_by_hash(Data& data, size_t hash_value) { - data.template prefetch_by_hash(hash_value); - } - - ALWAYS_INLINE auto get_key_holder(size_t row, Arena& pool) { - return static_cast(*this).get_key_holder(row, pool); - } - protected: Cache cache; @@ -198,100 +107,6 @@ class HashMethodBase { } } - template - ALWAYS_INLINE EmplaceResult emplaceImpl(KeyHolder& key_holder, Data& data) { - if constexpr (Cache::consecutive_keys_optimization) { - if (cache.found && cache.check(key_holder_get_key(key_holder))) { - if constexpr (has_mapped) - return EmplaceResult(cache.value.second, cache.value.second, false); - else - return EmplaceResult(false); - } - } - - typename Data::LookupResult it; - bool inserted = false; - data.emplace(key_holder, it, inserted); - - [[maybe_unused]] Mapped* cached = nullptr; - if constexpr (has_mapped) cached = lookup_result_get_mapped(it); - - if (inserted) { - if constexpr (has_mapped) { - new (lookup_result_get_mapped(it)) Mapped(); - } - } - - if constexpr (consecutive_keys_optimization) { - cache.found = true; - cache.empty = false; - - if constexpr (has_mapped) { - cache.value.first = *lookup_result_get_key(it); - cache.value.second = *lookup_result_get_mapped(it); - cached = &cache.value.second; - } else { - cache.value = *lookup_result_get_key(it); - } - } - - if constexpr (has_mapped) - return EmplaceResult(*lookup_result_get_mapped(it), *cached, inserted); - else - return EmplaceResult(inserted); - } - - template - ALWAYS_INLINE EmplaceResult emplaceImpl(KeyHolder& key_holder, size_t hash_value, Data& data) { - if constexpr (Cache::consecutive_keys_optimization) { - if (cache.found && cache.check(key_holder_get_key(key_holder))) { - if constexpr (has_mapped) - return EmplaceResult(cache.value.second, cache.value.second, false); - else - return EmplaceResult(false); - } - } - - typename Data::LookupResult it; - bool inserted = false; - data.emplace(key_holder, it, inserted, hash_value); - - [[maybe_unused]] Mapped* cached = nullptr; - if constexpr (has_mapped) cached = lookup_result_get_mapped(it); - - if (inserted) { - if constexpr (has_mapped) { - new (lookup_result_get_mapped(it)) Mapped(); - } - } - - if constexpr (consecutive_keys_optimization) { - cache.found = true; - cache.empty = false; - - if constexpr (has_mapped) { - cache.value.first = *lookup_result_get_key(it); - cache.value.second = *lookup_result_get_mapped(it); - cached = &cache.value.second; - } else { - cache.value = *lookup_result_get_key(it); - } - } - - if constexpr (has_mapped) - return EmplaceResult(*lookup_result_get_mapped(it), *cached, inserted); - else - return EmplaceResult(inserted); - } - - template - requires has_mapped - ALWAYS_INLINE Mapped& lazy_emplace_impl(KeyHolder& key_holder, Data& data, Func&& f) { - typename Data::LookupResult it; - data.lazy_emplace(key_holder, it, std::forward(f)); - return *lookup_result_get_mapped(it); - } - template requires has_mapped ALWAYS_INLINE Mapped& lazy_emplace_impl(KeyHolder& key_holder, size_t hash_value, Data& data, @@ -302,39 +117,6 @@ class HashMethodBase { return *lookup_result_get_mapped(it); } - template - ALWAYS_INLINE FindResult find_key_impl(Key key, Data& data) { - if constexpr (Cache::consecutive_keys_optimization) { - if (cache.check(key)) { - if constexpr (has_mapped) - return FindResult(&cache.value.second, cache.found); - else - return FindResult(cache.found); - } - } - - auto it = data.find(key); - - if constexpr (consecutive_keys_optimization) { - cache.found = it != nullptr; - cache.empty = false; - - if constexpr (has_mapped) { - cache.value.first = key; - if (it) { - cache.value.second = *lookup_result_get_mapped(it); - } - } else { - cache.value = key; - } - } - - if constexpr (has_mapped) - return FindResult(it ? lookup_result_get_mapped(it) : nullptr, it != nullptr); - else - return FindResult(it != nullptr); - } - template ALWAYS_INLINE FindResult find_key_impl(Key key, size_t hash_value, Data& data) { if constexpr (Cache::consecutive_keys_optimization) { diff --git a/be/src/vec/common/hash_table/fixed_hash_map.h b/be/src/vec/common/hash_table/fixed_hash_map.h index 164411550c1fb7..a43a8381a5110a 100644 --- a/be/src/vec/common/hash_table/fixed_hash_map.h +++ b/be/src/vec/common/hash_table/fixed_hash_map.h @@ -124,27 +124,6 @@ class FixedHashMap : public FixedHashTable { using Base::Base; - template - void ALWAYS_INLINE merge_to_via_emplace(Self& that, Func&& func) { - for (auto it = this->begin(), end = this->end(); it != end; ++it) { - typename Self::LookupResult res_it; - bool inserted; - that.emplace(it->get_key(), res_it, inserted, it.get_hash()); - func(res_it->get_mapped(), it->get_mapped(), inserted); - } - } - - template - void ALWAYS_INLINE merge_to_via_find(Self& that, Func&& func) { - for (auto it = this->begin(), end = this->end(); it != end; ++it) { - auto res_it = that.find(it->get_key(), it.get_hash()); - if (!res_it) - func(it->get_mapped(), it->get_mapped(), false); - else - func(res_it->get_mapped(), it->get_mapped(), true); - } - } - template void for_each_value(Func&& func) { for (auto& v : *this) func(v.get_key(), v.get_mapped()); @@ -166,8 +145,10 @@ class FixedHashMap : public FixedHashTable { // fixed hash map never overflow bool add_elem_size_overflow(size_t add_size) const { return false; } - - char* get_null_key_data() { return nullptr; } + template + char* get_null_key_data() { + return nullptr; + } bool has_null_key_data() const { return false; } }; diff --git a/be/src/vec/common/hash_table/fixed_hash_table.h b/be/src/vec/common/hash_table/fixed_hash_table.h index c498402ac405a1..a0e901d6c3c3f7 100644 --- a/be/src/vec/common/hash_table/fixed_hash_table.h +++ b/be/src/vec/common/hash_table/fixed_hash_table.h @@ -254,10 +254,23 @@ class FixedHashTable : private boost::noncopyable, return; } - f(Constructor(&buf[x]), x); + f(Constructor(&buf[x]), x, x); this->increase_size(); } + template + void ALWAYS_INLINE lazy_emplace(const Key& x, LookupResult& it, size_t hash_value, Func&& f) { + lazy_emplace(x, it, std::forward(f)); + } + + template + void ALWAYS_INLINE prefetch(const Key& key, size_t hash_value) { + // Two optional arguments: + // 'rw': 1 means the memory access is write + // 'locality': 0-3. 0 means no temporal locality. 3 means high temporal locality. + __builtin_prefetch(&buf[hash_value], READ ? 0 : 1, 1); + } + std::pair ALWAYS_INLINE insert(const value_type& x) { std::pair res; emplace(Cell::get_key(x), res.first, res.second); diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index e3aec8129a0ffe..5b7cd6f4642bba 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -150,38 +150,6 @@ class HashMapTable : public HashTable { using HashTable::HashTable; - /// Merge every cell's value of current map into the destination map via emplace. - /// Func should have signature void(Mapped & dst, Mapped & src, bool emplaced). - /// Each filled cell in current map will invoke func once. If that map doesn't - /// have a key equals to the given cell, a new cell gets emplaced into that map, - /// and func is invoked with the third argument emplaced set to true. Otherwise - /// emplaced is set to false. - template - void ALWAYS_INLINE merge_to_via_emplace(Self& that, Func&& func) { - for (auto it = this->begin(), end = this->end(); it != end; ++it) { - typename Self::LookupResult res_it; - bool inserted; - that.emplace(it->get_first(), res_it, inserted, it.get_hash()); - func(*lookup_result_get_mapped(res_it), it->get_second(), inserted); - } - } - - /// Merge every cell's value of current map into the destination map via find. - /// Func should have signature void(Mapped & dst, Mapped & src, bool exist). - /// Each filled cell in current map will invoke func once. If that map doesn't - /// have a key equals to the given cell, func is invoked with the third argument - /// exist set to false. Otherwise exist is set to true. - template - void ALWAYS_INLINE merge_to_via_find(Self& that, Func&& func) { - for (auto it = this->begin(), end = this->end(); it != end; ++it) { - auto res_it = that.find(it->get_first(), it.get_hash()); - if (!res_it) - func(it->get_second(), it->get_second(), false); - else - func(*lookup_result_get_mapped(res_it), it->get_second(), true); - } - } - /// Call func(const Key &, Mapped &) for each hash map element. template void for_each_value(Func&& func) { @@ -218,7 +186,10 @@ class HashMapTable : public HashTable { return *lookup_result_get_mapped(it); } - char* get_null_key_data() { return nullptr; } + template + char* get_null_key_data() { + return nullptr; + } bool has_null_key_data() const { return false; } }; diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h new file mode 100644 index 00000000000000..34250e6bd4f73a --- /dev/null +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -0,0 +1,487 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "runtime/descriptors.h" +#include "util/stack_util.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/arena.h" +#include "vec/common/assert_cast.h" +#include "vec/common/columns_hashing.h" +#include "vec/common/hash_table/partitioned_hash_map.h" +#include "vec/common/hash_table/string_hash_map.h" +#include "vec/common/string_ref.h" +#include "vec/core/types.h" +#include "vec/utils/util.hpp" + +namespace doris::vectorized { + +template +struct DataWithNullKey; + +template +struct MethodBase { + using Key = typename HashMap::key_type; + using Mapped = typename HashMap::mapped_type; + using Value = typename HashMap::value_type; + using Iterator = typename HashMap::iterator; + + std::shared_ptr hash_table; + Iterator iterator; + bool inited_iterator = false; + Key* keys; + Arena arena; + std::vector hash_values; + + MethodBase() { hash_table.reset(new HashMap()); } + virtual ~MethodBase() = default; + + virtual void reset() { + arena.clear(); + inited_iterator = false; + } + + void init_iterator() { + if (!inited_iterator) { + inited_iterator = true; + iterator = hash_table->begin(); + } + } + virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& key_sizes, + size_t num_rows, const uint8_t* null_map = nullptr) = 0; + + void init_hash_values(size_t num_rows, const uint8_t* null_map) { + if (null_map == nullptr) { + init_hash_values(num_rows); + return; + } + hash_values.resize(num_rows); + for (size_t k = 0; k < num_rows; ++k) { + if (null_map[k]) { + continue; + } + + hash_values[k] = hash_table->hash(keys[k]); + } + } + void init_hash_values(size_t num_rows) { + hash_values.resize(num_rows); + for (size_t k = 0; k < num_rows; ++k) { + hash_values[k] = hash_table->hash(keys[k]); + } + } + + template + void prefetch(int currrent) { + if (LIKELY(currrent + HASH_MAP_PREFETCH_DIST < hash_values.size())) { + hash_table->template prefetch(keys[currrent + HASH_MAP_PREFETCH_DIST], + hash_values[currrent + HASH_MAP_PREFETCH_DIST]); + } + } + + template + auto find(State& state, size_t i) { + prefetch(i); + return state.find_key_with_hash(*hash_table, hash_values[i], keys[i]); + } + + template + auto& lazy_emplace(State& state, size_t i, F&& creator, FF&& creator_for_null_key) { + prefetch(i); + return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i], creator, + creator_for_null_key); + } + + static constexpr bool need_presis() { return std::is_same_v; } + + static constexpr bool is_string_hash_map() { + return std::is_same_v, HashMap> || + std::is_same_v>, HashMap>; + } + + template + static void try_presis_key(Key& key, Origin& origin, Arena& arena) { + if constexpr (need_presis()) { + origin.data = arena.insert(origin.data, origin.size); + if constexpr (!is_string_hash_map()) { + key = origin; + } + } + } + + virtual void insert_keys_into_columns(std::vector& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes&) = 0; +}; + +template +struct MethodSerialized : public MethodBase { + using Base = MethodBase; + using Base::init_iterator; + using State = ColumnsHashing::HashMethodSerialized; + using Base::try_presis_key; + + std::vector stored_keys; + + StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size, + const ColumnRawPtrs& key_columns, Arena& pool) { + const char* begin = nullptr; + + size_t sum_size = 0; + for (size_t j = 0; j < keys_size; ++j) { + sum_size += key_columns[j]->serialize_value_into_arena(i, pool, begin).size; + } + + return {begin, sum_size}; + } + + void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& key_sizes, + size_t num_rows, const uint8_t* null_map = nullptr) override { + Base::arena.clear(); + stored_keys.resize(num_rows); + + size_t max_one_row_byte_size = 0; + for (const auto& column : key_columns) { + max_one_row_byte_size += column->get_max_row_byte_size(); + } + size_t total_bytes = max_one_row_byte_size * num_rows; + + if (total_bytes > config::pre_serialize_keys_limit_bytes) { + // reach mem limit, don't serialize in batch + size_t keys_size = key_columns.size(); + for (size_t i = 0; i < num_rows; ++i) { + stored_keys[i] = + serialize_keys_to_pool_contiguous(i, keys_size, key_columns, Base::arena); + } + } else { + uint8_t* serialized_key_buffer = + reinterpret_cast(Base::arena.alloc(total_bytes)); + + for (size_t i = 0; i < num_rows; ++i) { + stored_keys[i].data = + reinterpret_cast(serialized_key_buffer + i * max_one_row_byte_size); + stored_keys[i].size = 0; + } + + for (const auto& column : key_columns) { + column->serialize_vec(stored_keys, num_rows, max_one_row_byte_size); + } + } + Base::keys = stored_keys.data(); + Base::init_hash_values(num_rows, null_map); + } + + void insert_keys_into_columns(std::vector& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes&) override { + for (auto& column : key_columns) { + column->deserialize_vec(keys, num_rows); + } + } +}; + +inline size_t get_bitmap_size(size_t key_number) { + return (key_number + 7) / 8; +} + +template +struct MethodStringNoCache : public MethodBase { + using Base = MethodBase; + using Base::init_iterator; + using Base::hash_table; + using State = + ColumnsHashing::HashMethodString; + + std::vector stored_keys; + + void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& key_sizes, + size_t num_rows, const uint8_t* null_map = nullptr) override { + const IColumn& column = *key_columns[0]; + const ColumnString& column_string = assert_cast( + column.is_nullable() + ? assert_cast(column).get_nested_column() + : column); + auto offsets = column_string.get_offsets().data(); + auto chars = column_string.get_chars().data(); + + stored_keys.resize(column_string.size()); + for (size_t row = 0; row < column_string.size(); row++) { + stored_keys[row] = StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1]); + } + + Base::keys = stored_keys.data(); + Base::init_hash_values(num_rows, null_map); + } + + void insert_keys_into_columns(std::vector& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes&) override { + key_columns[0]->reserve(num_rows); + key_columns[0]->insert_many_strings(keys.data(), num_rows); + } +}; + +/// For the case where there is one numeric key. +/// FieldType is UInt8/16/32/64 for any type with corresponding bit width. +template +struct MethodOneNumber : public MethodBase { + using Base = MethodBase; + using Base::init_iterator; + using Base::hash_table; + using State = ColumnsHashing::HashMethodOneNumber; + + void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& key_sizes, + size_t num_rows, const uint8_t* null_map = nullptr) override { + Base::keys = (FieldType*)(key_columns[0]->is_nullable() + ? assert_cast(key_columns[0]) + ->get_nested_column_ptr() + : key_columns[0]) + ->get_raw_data() + .data; + std::string name = key_columns[0]->get_name(); + Base::init_hash_values(num_rows, null_map); + } + + void insert_keys_into_columns(std::vector& keys, + MutableColumns& key_columns, const size_t num_rows, + const Sizes&) override { + key_columns[0]->reserve(num_rows); + auto* column = static_cast(key_columns[0].get()); + for (size_t i = 0; i != num_rows; ++i) { + const auto* key_holder = reinterpret_cast(&keys[i]); + column->insert_raw_data(key_holder); + } + } +}; + +template +struct MethodKeysFixed : public MethodBase { + using Base = MethodBase; + using typename Base::Key; + using typename Base::Mapped; + using Base::keys; + using Base::hash_table; + using Base::iterator; + + using State = ColumnsHashing::HashMethodKeysFixed; + + std::vector stored_keys; + + template + std::vector pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns, + const Sizes& key_sizes, const ColumnRawPtrs& nullmap_columns) { + size_t bitmap_size = get_bitmap_size(nullmap_columns.size()); + + std::vector result(row_numbers); + size_t offset = 0; + if (bitmap_size > 0) { + for (size_t j = 0; j < nullmap_columns.size(); j++) { + if (!nullmap_columns[j]) { + continue; + } + size_t bucket = j / 8; + size_t offset = j % 8; + const auto& data = + assert_cast(*nullmap_columns[j]).get_data().data(); + for (size_t i = 0; i < row_numbers; ++i) { + *((char*)(&result[i]) + bucket) |= data[i] << offset; + } + } + offset += bitmap_size; + } + + for (size_t j = 0; j < key_columns.size(); ++j) { + const char* data = key_columns[j]->get_raw_data().data; + + auto foo = [&](Fixed zero) { + CHECK_EQ(sizeof(Fixed), key_sizes[j]); + if (nullmap_columns.size() && nullmap_columns[j]) { + const auto& nullmap = + assert_cast(*nullmap_columns[j]).get_data().data(); + for (size_t i = 0; i < row_numbers; ++i) { + // make sure null cell is filled by 0x0 + memcpy_fixed((char*)(&result[i]) + offset, + nullmap[i] ? (char*)&zero : data + i * sizeof(Fixed)); + } + } else { + for (size_t i = 0; i < row_numbers; ++i) { + memcpy_fixed((char*)(&result[i]) + offset, data + i * sizeof(Fixed)); + } + } + }; + + if (key_sizes[j] == 1) { + foo(int8_t()); + } else if (key_sizes[j] == 2) { + foo(int16_t()); + } else if (key_sizes[j] == 4) { + foo(int32_t()); + } else if (key_sizes[j] == 8) { + foo(int64_t()); + } else if (key_sizes[j] == 16) { + foo(UInt128()); + } else { + throw Exception(ErrorCode::INTERNAL_ERROR, + "pack_fixeds input invalid key size, key_size={}", key_sizes[j]); + } + offset += key_sizes[j]; + } + return result; + } + + void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& key_sizes, + size_t num_rows, const uint8_t* null_map = nullptr) override { + ColumnRawPtrs actual_columns; + ColumnRawPtrs null_maps; + if (has_nullable_keys) { + actual_columns.reserve(key_columns.size()); + null_maps.reserve(key_columns.size()); + for (const auto& col : key_columns) { + if (auto* nullable_col = check_and_get_column(col)) { + actual_columns.push_back(&nullable_col->get_nested_column()); + null_maps.push_back(&nullable_col->get_null_map_column()); + } else { + actual_columns.push_back(col); + null_maps.push_back(nullptr); + } + } + } else { + actual_columns = key_columns; + } + stored_keys = pack_fixeds(num_rows, actual_columns, key_sizes, null_maps); + Base::keys = stored_keys.data(); + Base::init_hash_values(num_rows, null_map); + } + + void insert_keys_into_columns(std::vector& keys, + MutableColumns& key_columns, const size_t num_rows, + const Sizes& key_sizes) override { + // In any hash key value, column values to be read start just after the bitmap, if it exists. + size_t pos = has_nullable_keys ? get_bitmap_size(key_columns.size()) : 0; + + for (size_t i = 0; i < key_columns.size(); ++i) { + size_t size = key_sizes[i]; + char* data = nullptr; + key_columns[i]->resize(num_rows); + // If we have a nullable column, get its nested column and its null map. + if (is_column_nullable(*key_columns[i])) { + ColumnNullable& nullable_col = assert_cast(*key_columns[i]); + + data = const_cast(nullable_col.get_nested_column().get_raw_data().data); + UInt8* nullmap = assert_cast(&nullable_col.get_null_map_column()) + ->get_data() + .data(); + + // The current column is nullable. Check if the value of the + // corresponding key is nullable. Update the null map accordingly. + size_t bucket = i / 8; + size_t offset = i % 8; + for (size_t j = 0; j < num_rows; j++) { + nullmap[j] = (reinterpret_cast(&keys[j])[bucket] >> offset) & 1; + } + } else { + data = const_cast(key_columns[i]->get_raw_data().data); + } + + auto foo = [&](Fixed zero) { + CHECK_EQ(sizeof(Fixed), size); + for (size_t j = 0; j < num_rows; j++) { + memcpy_fixed(data + j * sizeof(Fixed), (char*)(&keys[j]) + pos); + } + }; + + if (size == 1) { + foo(int8_t()); + } else if (size == 2) { + foo(int16_t()); + } else if (size == 4) { + foo(int32_t()); + } else if (size == 8) { + foo(int64_t()); + } else if (size == 16) { + foo(UInt128()); + } else { + throw Exception(ErrorCode::INTERNAL_ERROR, + "pack_fixeds input invalid key size, key_size={}", size); + } + + pos += size; + } + } +}; + +template +struct DataWithNullKey : public Base { + bool& has_null_key_data() { return has_null_key; } + bool has_null_key_data() const { return has_null_key; } + template + MappedType& get_null_key_data() const { + return (MappedType&)null_key_data; + } + size_t size() const { return Base::size() + has_null_key; } + bool empty() const { return Base::empty() && !has_null_key; } + + void clear() { + Base::clear(); + has_null_key = false; + } + + void clear_and_shrink() { + Base::clear_and_shrink(); + has_null_key = false; + } + +private: + bool has_null_key = false; + // null_key_data store AggregateDataPtr on agg node, store PartitionBlocks on partition sort node. + void* null_key_data = nullptr; +}; + +/// Single low cardinality column. +template +struct MethodSingleNullableColumn : public SingleColumnMethod { + using Base = SingleColumnMethod; + using State = ColumnsHashing::HashMethodSingleLowNullableColumn; + + void insert_keys_into_columns(std::vector& keys, + MutableColumns& key_columns, const size_t num_rows, + const Sizes&) override { + auto col = key_columns[0].get(); + col->reserve(num_rows); + if constexpr (std::is_same_v) { + col->insert_many_strings(keys.data(), num_rows); + } else { + col->insert_many_raw_data(reinterpret_cast(keys.data()), num_rows); + } + } +}; + +template +using SerializedHashTableContext = MethodSerialized>; + +template +using PrimaryTypeHashTableContext = + MethodOneNumber>>; + +template +using FixedKeyHashTableContext = + MethodKeysFixed>, has_null>; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/common/hash_table/hash_table.h b/be/src/vec/common/hash_table/hash_table.h index 19ae8e2ed3b73a..47c9d62e01ff14 100644 --- a/be/src/vec/common/hash_table/hash_table.h +++ b/be/src/vec/common/hash_table/hash_table.h @@ -30,7 +30,6 @@ #include "common/status.h" #include "util/runtime_profile.h" #include "vec/common/hash_table/hash_table_allocator.h" -#include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/core/types.h" #include "vec/io/io_helper.h" @@ -719,9 +718,9 @@ class HashTable : private boost::noncopyable, return false; } - template - bool ALWAYS_INLINE lazy_emplace_if_zero(const Key& x, LookupResult& it, size_t hash_value, - Func&& f) { + template + bool ALWAYS_INLINE lazy_emplace_if_zero(Key& x, Origin& origin, LookupResult& it, + size_t hash_value, Func&& f) { /// If it is claimed that the zero key can not be inserted into the table. if (!Cell::need_zero_value_storage) return false; @@ -730,7 +729,7 @@ class HashTable : private boost::noncopyable, if (!this->get_has_zero()) { ++m_size; this->set_get_has_zero(); - std::forward(f)(Constructor(it), x); + std::forward(f)(Constructor(it), x, origin); this->zero_value()->set_hash(hash_value); } @@ -741,19 +740,15 @@ class HashTable : private boost::noncopyable, } template - void ALWAYS_INLINE emplace_non_zero_impl(size_t place_value, KeyHolder&& key_holder, - LookupResult& it, bool& inserted, size_t hash_value) { + void ALWAYS_INLINE emplace_non_zero_impl(size_t place_value, KeyHolder&& key, LookupResult& it, + bool& inserted, size_t hash_value) { it = &buf[place_value]; if (!buf[place_value].is_zero(*this)) { - key_holder_discard_key(key_holder); inserted = false; return; } - key_holder_persist_key(key_holder); - const auto& key = key_holder_get_key(key_holder); - new (&buf[place_value]) Cell(key, *this); buf[place_value].set_hash(hash_value); inserted = true; @@ -781,20 +776,17 @@ class HashTable : private boost::noncopyable, } } - template - void ALWAYS_INLINE lazy_emplace_non_zero_impl(size_t place_value, KeyHolder&& key_holder, - LookupResult& it, size_t hash_value, Func&& f) { + template + void ALWAYS_INLINE lazy_emplace_non_zero_impl(size_t place_value, KeyHolder&& key, + Origin&& origin, LookupResult& it, + size_t hash_value, Func&& f) { it = &buf[place_value]; if (!buf[place_value].is_zero(*this)) { - key_holder_discard_key(key_holder); return; } - key_holder_persist_key(key_holder); - const auto& key = key_holder_get_key(key_holder); - - f(Constructor(&buf[place_value]), key); + f(Constructor(&buf[place_value]), key, origin); buf[place_value].set_hash(hash_value); ++m_size; @@ -822,19 +814,17 @@ class HashTable : private boost::noncopyable, /// Only for non-zero keys. Find the right place, insert the key there, if it does not already exist. Set iterator to the cell in output parameter. template - void ALWAYS_INLINE emplace_non_zero(KeyHolder&& key_holder, LookupResult& it, bool& inserted, + void ALWAYS_INLINE emplace_non_zero(KeyHolder&& key, LookupResult& it, bool& inserted, size_t hash_value) { - const auto& key = key_holder_get_key(key_holder); size_t place_value = find_cell(key, hash_value, grower.place(hash_value)); - emplace_non_zero_impl(place_value, key_holder, it, inserted, hash_value); + emplace_non_zero_impl(place_value, key, it, inserted, hash_value); } - template - void ALWAYS_INLINE lazy_emplace_non_zero(KeyHolder&& key_holder, LookupResult& it, + template + void ALWAYS_INLINE lazy_emplace_non_zero(KeyHolder&& key, Origin&& origin, LookupResult& it, size_t hash_value, Func&& f) { - const auto& key = key_holder_get_key(key_holder); size_t place_value = find_cell(key, hash_value, grower.place(hash_value)); - lazy_emplace_non_zero_impl(place_value, key_holder, it, hash_value, std::forward(f)); + lazy_emplace_non_zero_impl(place_value, key, origin, it, hash_value, std::forward(f)); } public: @@ -859,7 +849,7 @@ class HashTable : private boost::noncopyable, } template - void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { + void ALWAYS_INLINE prefetch(size_t hash_value) { // Two optional arguments: // 'rw': 1 means the memory access is write // 'locality': 0-3. 0 means no temporal locality. 3 means high temporal locality. @@ -867,6 +857,11 @@ class HashTable : private boost::noncopyable, __builtin_prefetch(&buf[place_value], READ ? 0 : 1, 1); } + template + void ALWAYS_INLINE prefetch(const Key& key, size_t hash_value) { + prefetch(hash_value); + } + /// Reinsert node pointed to by iterator void ALWAYS_INLINE reinsert(iterator& it, size_t hash_value) { reinsert(*it.get_ptr(), hash_value); @@ -902,17 +897,16 @@ class HashTable : private boost::noncopyable, * new(&it->second) Mapped(value); */ template - void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted) { - const auto& key = key_holder_get_key(key_holder); - emplace(key_holder, it, inserted, hash(key)); + void ALWAYS_INLINE emplace(KeyHolder&& key, LookupResult& it, bool& inserted) { + emplace(key, it, inserted, hash(key)); } template - void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted, + void ALWAYS_INLINE emplace(KeyHolder&& key, LookupResult& it, bool& inserted, size_t hash_value) { - const auto& key = key_holder_get_key(key_holder); - if (!emplace_if_zero(key, it, inserted, hash_value)) - emplace_non_zero(key_holder, it, inserted, hash_value); + if (!emplace_if_zero(key, it, inserted, hash_value)) { + emplace_non_zero(key, it, inserted, hash_value); + } } template @@ -922,17 +916,24 @@ class HashTable : private boost::noncopyable, } template - void ALWAYS_INLINE lazy_emplace(KeyHolder&& key_holder, LookupResult& it, Func&& f) { - const auto& key = key_holder_get_key(key_holder); - lazy_emplace(key_holder, it, hash(key), std::forward(f)); + void ALWAYS_INLINE lazy_emplace(KeyHolder&& key, LookupResult& it, Func&& f) { + lazy_emplace(key, it, hash(key), std::forward(f)); } template - void ALWAYS_INLINE lazy_emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value, + void ALWAYS_INLINE lazy_emplace(KeyHolder&& key, LookupResult& it, size_t hash_value, Func&& f) { - const auto& key = key_holder_get_key(key_holder); - if (!lazy_emplace_if_zero(key, it, hash_value, std::forward(f))) - lazy_emplace_non_zero(key_holder, it, hash_value, std::forward(f)); + if (!lazy_emplace_if_zero(key, key, it, hash_value, std::forward(f))) { + lazy_emplace_non_zero(key, key, it, hash_value, std::forward(f)); + } + } + + template + void ALWAYS_INLINE lazy_emplace_with_origin(KeyHolder&& key, Origin&& origin, LookupResult& it, + size_t hash_value, Func&& f) { + if (!lazy_emplace_if_zero(key, origin, it, hash_value, std::forward(f))) { + lazy_emplace_non_zero(key, origin, it, hash_value, std::forward(f)); + } } /// Copy the cell from another hash table. It is assumed that the cell is not zero, and also that there was no such key in the table yet. @@ -942,11 +943,15 @@ class HashTable : private boost::noncopyable, memcpy(static_cast(&buf[place_value]), cell, sizeof(*cell)); ++m_size; - if (UNLIKELY(grower.overflow(m_size))) resize(); + if (UNLIKELY(grower.overflow(m_size))) { + resize(); + } } LookupResult ALWAYS_INLINE find(Key x) { - if (Cell::is_zero(x, *this)) return this->get_has_zero() ? this->zero_value() : nullptr; + if (Cell::is_zero(x, *this)) { + return this->get_has_zero() ? this->zero_value() : nullptr; + } size_t hash_value = hash(x); auto [is_zero, place_value] = find_cell_opt(x, hash_value, grower.place(hash_value)); diff --git a/be/src/vec/common/hash_table/hash_table_key_holder.h b/be/src/vec/common/hash_table/hash_table_key_holder.h deleted file mode 100644 index 7137403ff2f6c9..00000000000000 --- a/be/src/vec/common/hash_table/hash_table_key_holder.h +++ /dev/null @@ -1,150 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/HashTableKeyHolder.h -// and modified by Doris - -#pragma once - -#include "vec/common/arena.h" -#include "vec/common/string_ref.h" - -/** - * In some aggregation scenarios, when adding a key to the hash table, we - * start with a temporary key object, and if it turns out to be a new key, - * we must make it persistent (e.g. copy to an Arena) and use the resulting - * persistent object as hash table key. This happens only for StringRef keys, - * because other key types are stored by value, but StringRef is a pointer-like - * type: the actual data are stored elsewhere. Even for StringRef, we don't - * make a persistent copy of the key in each of the following cases: - * 1) the aggregation method doesn't use temporary keys, so they're persistent - * from the start; - * 1) the key is already present in the hash table; - * 3) that particular key is stored by value, e.g. a short StringRef key in - * StringHashMap. - * - * In the past, the caller was responsible for making the key persistent after - * in was inserted. emplace() returned whether the key is new or not, so the - * caller only stored new keys (this is case (2) from the above list). However, - * now we are adding a compound hash table for StringRef keys, so case (3) - * appears. The decision about persistence now depends on some properties of - * the key, and the logic of this decision is tied to the particular hash table - * implementation. This means that the hash table user now doesn't have enough - * data and logic to make this decision by itself. - * - * To support these new requirements, we now manage key persistence by passing - * a special key holder to emplace(), which has the functions to make the key - * persistent or to discard it. emplace() then calls these functions at the - * appropriate moments. - * - * This approach has the following benefits: - * - no extra runtime branches in the caller to make the key persistent. - * - no additional data is stored in the hash table itself, which is important - * when it's used in aggregate function states. - * - no overhead when the key memory management isn't needed: we just pass the - * bare key without any wrapper to emplace(), and the default callbacks do - * nothing. - * - * This file defines the default key persistence functions, as well as two - * different key holders and corresponding functions for storing StringRef - * keys to Arena. - */ - -/** - * Returns the key. Can return the temporary key initially. - * After the call to key_holder_persist_key(), must return the persistent key. - */ -template -Key& key_holder_get_key(Key&& key) { - return key; -} - -/** - * Make the key persistent. key_holder_get_key() must return the persistent key - * after this call. - */ -template -void key_holder_persist_key(Key&&) {} - -/** - * Discard the key. Calling key_holder_get_key() is ill-defined after this. - */ -template -void key_holder_discard_key(Key&&) {} - -namespace doris::vectorized { - -/** - * ArenaKeyHolder is a key holder for hash tables that serializes a StringRef - * key to an Arena. - */ -struct ArenaKeyHolder { - StringRef key; - Arena& pool; -}; - -} // namespace doris::vectorized - -inline doris::StringRef& ALWAYS_INLINE -key_holder_get_key(doris::vectorized::ArenaKeyHolder& holder) { - return holder.key; -} - -inline void ALWAYS_INLINE key_holder_persist_key(doris::vectorized::ArenaKeyHolder& holder) { - // Hash table shouldn't ask us to persist a zero key - assert(holder.key.size > 0); - holder.key.data = holder.pool.insert(holder.key.data, holder.key.size); -} - -inline void ALWAYS_INLINE key_holder_discard_key(doris::vectorized::ArenaKeyHolder&) {} - -namespace doris::vectorized { - -/** - * SerializedKeyHolder is a key holder for a StringRef key that is already - * serialized to an Arena. The key must be the last allocation in this Arena, - * and is discarded by rolling back the allocation. - */ -struct SerializedKeyHolder { - StringRef key; - Arena& pool; -}; - -} // namespace doris::vectorized - -inline doris::StringRef& ALWAYS_INLINE -key_holder_get_key(doris::vectorized::SerializedKeyHolder& holder) { - return holder.key; -} - -inline void ALWAYS_INLINE key_holder_persist_key(doris::vectorized::SerializedKeyHolder&) {} - -inline void ALWAYS_INLINE key_holder_discard_key(doris::vectorized::SerializedKeyHolder& holder) { - [[maybe_unused]] void* new_head = holder.pool.rollback(holder.key.size); - assert(new_head == holder.key.data); - holder.key.data = nullptr; - holder.key.size = 0; -} - -template -void key_holder_persist_key_with_arena(Key&, doris::vectorized::Arena&) {} - -inline void key_holder_persist_key_with_arena(doris::StringRef& key, - doris::vectorized::Arena& arena) { - // Hash table shouldn't ask us to persist a zero key - key.data = arena.insert(key.data, key.size); -} diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h b/be/src/vec/common/hash_table/hash_table_set_build.h index 977f9deac396e4..8afa52b4b16baf 100644 --- a/be/src/vec/common/hash_table/hash_table_set_build.h +++ b/be/src/vec/common/hash_table/hash_table_set_build.h @@ -21,7 +21,6 @@ #include "vec/exec/vset_operation_node.h" namespace doris::vectorized { -//build hash table for operation node, intersect/except node template struct HashTableBuild { HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, @@ -33,45 +32,31 @@ struct HashTableBuild { _operation_node(operation_node), _state(state) {} - Status operator()(HashTableContext& hash_table_ctx) { + Status operator()(HashTableContext& hash_table_ctx, Arena& arena) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; - int64_t old_bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); + int64_t old_bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); Defer defer {[&]() { - int64_t bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); + int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); _operation_node->_mem_used += bucket_bytes - old_bucket_bytes; }}; - KeyGetter key_getter(_build_raw_ptrs, _operation_node->_build_key_sz, nullptr); + KeyGetter key_getter(_build_raw_ptrs, _operation_node->_build_key_sz); + hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _operation_node->_build_key_sz, _rows); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - hash_table_ctx.serialize_keys(_build_raw_ptrs, _rows); - key_getter.set_serialized_keys(hash_table_ctx.keys.data()); - } - - _build_side_hash_values.resize(_rows); - const auto& keys = key_getter.get_keys(); - for (size_t k = 0; k < _rows; ++k) { - _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]); - } + size_t k = 0; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashTableContext::try_presis_key(key, origin, arena); + ctor(key, Mapped {k, _offset}); + }; + auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; }; - for (size_t k = 0; k < _rows; ++k) { + for (; k < _rows; ++k) { if (k % CHECK_FRECUENCY == 0) { RETURN_IF_CANCELLED(_state); } - auto emplace_result = key_getter.emplace_with_key(hash_table_ctx.hash_table, keys[k], - _build_side_hash_values[k], k); - - if (LIKELY(k + HASH_MAP_PREFETCH_DIST < _rows)) { - key_getter.template prefetch_by_hash( - hash_table_ctx.hash_table, - _build_side_hash_values[k + HASH_MAP_PREFETCH_DIST]); - } - - if (emplace_result.is_inserted()) { //only inserted once as the same key, others skip - new (&emplace_result.get_mapped()) Mapped({k, _offset}); - } + hash_table_ctx.lazy_emplace(key_getter, k, creator, creator_for_null_key); } return Status::OK(); } @@ -82,7 +67,6 @@ struct HashTableBuild { ColumnRawPtrs& _build_raw_ptrs; VSetOperationNode* _operation_node; RuntimeState* _state; - std::vector _build_side_hash_values; }; template @@ -94,42 +78,29 @@ struct HashTableBuildX { HashTableContext& hash_table_ctx) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; - int64_t old_bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); + int64_t old_bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); Defer defer {[&]() { - int64_t bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); + int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); local_state._shared_state->mem_used += bucket_bytes - old_bucket_bytes; }}; - KeyGetter key_getter(_build_raw_ptrs, local_state._shared_state->build_key_sz, nullptr); + KeyGetter key_getter(_build_raw_ptrs, local_state._shared_state->build_key_sz); + hash_table_ctx.init_serialized_keys(_build_raw_ptrs, + local_state._shared_state->build_key_sz, _rows); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - hash_table_ctx.serialize_keys(_build_raw_ptrs, _rows); - key_getter.set_serialized_keys(hash_table_ctx.keys.data()); - } - - _build_side_hash_values.resize(_rows); - const auto& keys = key_getter.get_keys(); - for (size_t k = 0; k < _rows; ++k) { - _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]); - } + size_t k = 0; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashTableContext::try_presis_key(key, origin, local_state._arena); + ctor(key, Mapped {k, _offset}); + }; + auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; }; - for (size_t k = 0; k < _rows; ++k) { + for (; k < _rows; ++k) { if (k % CHECK_FRECUENCY == 0) { RETURN_IF_CANCELLED(_state); } - auto emplace_result = key_getter.emplace_with_key(hash_table_ctx.hash_table, keys[k], - _build_side_hash_values[k], k); - - if (LIKELY(k + HASH_MAP_PREFETCH_DIST < _rows)) { - key_getter.template prefetch_by_hash( - hash_table_ctx.hash_table, - _build_side_hash_values[k + HASH_MAP_PREFETCH_DIST]); - } - - if (emplace_result.is_inserted()) { //only inserted once as the same key, others skip - new (&emplace_result.get_mapped()) Mapped({k, _offset}); - } + hash_table_ctx.lazy_emplace(key_getter, k, creator, creator_for_null_key); } return Status::OK(); } diff --git a/be/src/vec/common/hash_table/hash_table_set_probe.h b/be/src/vec/common/hash_table/hash_table_set_probe.h index 83139d810eead0..b9962c58d9347f 100644 --- a/be/src/vec/common/hash_table/hash_table_set_probe.h +++ b/be/src/vec/common/hash_table/hash_table_set_probe.h @@ -21,34 +21,24 @@ #include "vec/exec/vset_operation_node.h" namespace doris::vectorized { + template struct HashTableProbe { HashTableProbe(VSetOperationNode* operation_node, int probe_rows) : _operation_node(operation_node), _probe_rows(probe_rows), - _probe_raw_ptrs(operation_node->_probe_columns), - _arena(new Arena) {} + _probe_raw_ptrs(operation_node->_probe_columns) {} Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) { using KeyGetter = typename HashTableContext::State; - KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz, nullptr); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - if (_probe_keys.size() < _probe_rows) { - _probe_keys.resize(_probe_rows); - } - size_t keys_size = _probe_raw_ptrs.size(); - for (size_t i = 0; i < _probe_rows; ++i) { - _probe_keys[i] = - serialize_keys_to_pool_contiguous(i, keys_size, _probe_raw_ptrs, *_arena); - } - key_getter.set_serialized_keys(_probe_keys.data()); - } + KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz); + hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _operation_node->_probe_key_sz, + _probe_rows); if constexpr (std::is_same_v) { for (int probe_index = 0; probe_index < _probe_rows; probe_index++) { - auto find_result = - key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena); + auto find_result = hash_table_ctx.find(key_getter, probe_index); if (find_result.is_found()) { //if found, marked visited auto it = find_result.get_mapped().begin(); if (!(it->visited)) { @@ -71,38 +61,25 @@ struct HashTableProbe { VSetOperationNode* _operation_node; const size_t _probe_rows; ColumnRawPtrs& _probe_raw_ptrs; - std::unique_ptr _arena; std::vector _probe_keys; }; template struct HashTableProbeX { HashTableProbeX(pipeline::SetProbeSinkLocalState& local_state, int probe_rows) - : _probe_rows(probe_rows), - _probe_raw_ptrs(local_state._probe_columns), - _arena(new Arena) {} + : _probe_rows(probe_rows), _probe_raw_ptrs(local_state._probe_columns) {} Status mark_data_in_hashtable(pipeline::SetProbeSinkLocalState& local_state, HashTableContext& hash_table_ctx) { using KeyGetter = typename HashTableContext::State; - KeyGetter key_getter(_probe_raw_ptrs, local_state._shared_state->probe_key_sz, nullptr); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - if (_probe_keys.size() < _probe_rows) { - _probe_keys.resize(_probe_rows); - } - size_t keys_size = _probe_raw_ptrs.size(); - for (size_t i = 0; i < _probe_rows; ++i) { - _probe_keys[i] = - serialize_keys_to_pool_contiguous(i, keys_size, _probe_raw_ptrs, *_arena); - } - key_getter.set_serialized_keys(_probe_keys.data()); - } + KeyGetter key_getter(_probe_raw_ptrs, local_state._shared_state->probe_key_sz); + hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, + local_state._shared_state->probe_key_sz, _probe_rows); if constexpr (std::is_same_v) { for (int probe_index = 0; probe_index < _probe_rows; probe_index++) { - auto find_result = - key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena); + auto find_result = hash_table_ctx.find(key_getter, probe_index); if (find_result.is_found()) { //if found, marked visited auto it = find_result.get_mapped().begin(); if (!(it->visited)) { @@ -124,7 +101,6 @@ struct HashTableProbeX { private: const size_t _probe_rows; ColumnRawPtrs& _probe_raw_ptrs; - std::unique_ptr _arena; std::vector _probe_keys; }; diff --git a/be/src/vec/common/hash_table/hash_table_utils.h b/be/src/vec/common/hash_table/hash_table_utils.h index 5c081f80b91a43..37916ef240155f 100644 --- a/be/src/vec/common/hash_table/hash_table_utils.h +++ b/be/src/vec/common/hash_table/hash_table_utils.h @@ -23,6 +23,4 @@ template struct HashTableTraits { static constexpr bool is_phmap = false; - static constexpr bool is_string_hash_table = false; - static constexpr bool is_partitioned_table = false; }; diff --git a/be/src/vec/common/hash_table/partitioned_hash_map.h b/be/src/vec/common/hash_table/partitioned_hash_map.h index c60192ef23db32..f667360592d6f0 100644 --- a/be/src/vec/common/hash_table/partitioned_hash_map.h +++ b/be/src/vec/common/hash_table/partitioned_hash_map.h @@ -58,30 +58,12 @@ using PartitionedHashMap = template > using PHPartitionedHashMap = PartitionedHashMapTable>; -template -struct HashTableTraits> { - static constexpr bool is_phmap = false; - static constexpr bool is_string_hash_table = false; - static constexpr bool is_partitioned_table = true; -}; - -template