Skip to content

Commit

Permalink
[Improvement](hash) refactor of hash map context (apache#24966)
Browse files Browse the repository at this point in the history
refactor of hash map context
  • Loading branch information
BiteTheDDDDt authored Oct 12, 2023
1 parent 04bda13 commit 1a0344d
Show file tree
Hide file tree
Showing 57 changed files with 1,233 additions and 2,775 deletions.
135 changes: 29 additions & 106 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,

std::visit(
[&](auto&& agg_method) {
using HashTableType = std::decay_t<decltype(agg_method.data)>;
using KeyType = typename HashTableType::key_type;
using HashTableType = std::decay_t<decltype(agg_method)>;
using KeyType = typename HashTableType::Key;

/// some aggregate functions (like AVG for decimal) have align issues.
Base::_shared_state->aggregate_data_container.reset(
Expand Down Expand Up @@ -203,17 +203,6 @@ Status AggSinkLocalState<DependencyType, Derived>::_merge_with_serialized_key(
template <typename DependencyType, typename Derived>
size_t AggSinkLocalState<DependencyType, Derived>::_memory_usage() const {
size_t usage = 0;
std::visit(
[&](auto&& agg_method) {
using HashMethodType = std::decay_t<decltype(agg_method)>;
if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
HashMethodType>::value) {
usage += agg_method.keys_memory_usage;
}
usage += agg_method.data.get_buffer_size_in_bytes();
},
_agg_data->method_variant);

if (_agg_arena_pool) {
usage += _agg_arena_pool->size();
}
Expand All @@ -229,7 +218,7 @@ template <typename DependencyType, typename Derived>
void AggSinkLocalState<DependencyType, Derived>::_update_memusage_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = agg_method.data;
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Expand Down Expand Up @@ -321,7 +310,7 @@ Status AggSinkLocalState<DependencyType, Derived>::_merge_with_serialized_key_he

for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
if (Base::_shared_state->aggregate_evaluators[i]->is_merge() || for_spill) {
int col_id;
int col_id = 0;
if constexpr (for_spill) {
col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
} else {
Expand Down Expand Up @@ -498,7 +487,7 @@ Status AggSinkLocalState<DependencyType, Derived>::_execute_with_serialized_key_

template <typename DependencyType, typename Derived>
size_t AggSinkLocalState<DependencyType, Derived>::_get_hash_table_size() {
return std::visit([&](auto&& agg_method) { return agg_method.data.size(); },
return std::visit([&](auto&& agg_method) { return agg_method.hash_table->size(); },
_agg_data->method_variant);
}

Expand All @@ -510,29 +499,20 @@ void AggSinkLocalState<DependencyType, Derived>::_emplace_into_hash_table(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, Base::_shared_state->probe_key_sz, nullptr);

_pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);

auto creator = [&](const auto& ctor, const auto& key) {
using KeyType = std::decay_t<decltype(key)>;
if constexpr (HashTableTraits<HashTableType>::is_string_hash_table &&
!std::is_same_v<StringRef, KeyType>) {
StringRef string_ref = to_string_ref(key);
vectorized::ArenaKeyHolder key_holder {string_ref, *_agg_arena_pool};
key_holder_persist_key(key_holder);
auto mapped = Base::_shared_state->aggregate_data_container->append_data(
key_holder.key);
static_cast<void>(Base::_dependency->create_agg_status(mapped));
ctor(key, mapped);
} else {
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(key);
static_cast<void>(Base::_dependency->create_agg_status(mapped));
ctor(key, mapped);
AggState state(key_columns, Base::_shared_state->probe_key_sz);
agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz,
num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin, *_agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(origin);
auto st = Base::_dependency->create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
ctor(key, mapped);
};

auto creator_for_null_key = [&](auto& mapped) {
Expand All @@ -541,49 +521,15 @@ void AggSinkLocalState<DependencyType, Derived>::_emplace_into_hash_table(
._total_size_of_aggregate_states,
Base::_parent->template cast<typename Derived::Parent>()
._align_aggregate_states);
static_cast<void>(Base::_dependency->create_agg_status(mapped));
};

if constexpr (HashTableTraits<HashTableType>::is_phmap) {
const auto& keys = state.get_keys();
if (_hash_values.size() < num_rows) {
_hash_values.resize(num_rows);
}
for (size_t i = 0; i < num_rows; ++i) {
_hash_values[i] = agg_method.data.hash(keys[i]);
auto st = Base::_dependency->create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
};

SCOPED_TIMER(_hash_table_emplace_timer);
if constexpr (vectorized::ColumnsHashing::IsSingleNullableColumnMethod<
AggState>::value) {
for (size_t i = 0; i < num_rows; ++i) {
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
agg_method.data.prefetch_by_hash(
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
}

places[i] = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool,
_hash_values[i], creator,
creator_for_null_key);
}
} else {
state.lazy_emplace_keys(agg_method.data, keys, _hash_values, creator,
places);
}
} else {
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
vectorized::AggregateDataPtr mapped = nullptr;
if constexpr (vectorized::ColumnsHashing::IsSingleNullableColumnMethod<
AggState>::value) {
mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool,
creator, creator_for_null_key);
} else {
mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool,
creator);
}
places[i] = mapped;
}
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
places[i] = agg_method.lazy_emplace(state, i, creator, creator_for_null_key);
}

COUNTER_UPDATE(_hash_table_input_counter, num_rows);
Expand All @@ -598,35 +544,14 @@ void AggSinkLocalState<DependencyType, Derived>::_find_in_hash_table(
std::visit(
[&](auto&& agg_method) -> void {
using HashMethodType = std::decay_t<decltype(agg_method)>;
using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, Base::_shared_state->probe_key_sz, nullptr);

_pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);
const auto& keys = state.get_keys();
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
_hash_values.resize(num_rows);

for (size_t i = 0; i < num_rows; ++i) {
_hash_values[i] = agg_method.data.hash(keys[i]);
}
}
AggState state(key_columns, Base::_shared_state->probe_key_sz);
agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz,
num_rows);

/// For all rows.
for (size_t i = 0; i < num_rows; ++i) {
auto find_result = [&]() {
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
agg_method.data.prefetch_by_hash(
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
}

return state.find_key_with_hash(agg_method.data, _hash_values[i],
keys[i]);
} else {
return state.find_key(agg_method.data, i, *_agg_arena_pool);
}
}();
auto find_result = agg_method.find(state, i);

if (find_result.is_found()) {
places[i] = find_result.get_mapped();
Expand Down Expand Up @@ -759,7 +684,7 @@ Status AggSinkLocalState<DependencyType, Derived>::try_spill_disk(bool eos) {
}
return std::visit(
[&](auto&& agg_method) -> Status {
auto& hash_table = agg_method.data;
auto& hash_table = *agg_method.hash_table;
if (!eos &&
_memory_usage() < Base::_parent->template cast<typename Derived::Parent>()
._external_agg_bytes_threshold) {
Expand Down Expand Up @@ -935,8 +860,6 @@ Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state, St
std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);

std::vector<size_t> tmp_hash_values;
_hash_values.swap(tmp_hash_values);
return Base::close(state, exec_status);
}

Expand Down
24 changes: 4 additions & 20 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
Status _execute_with_serialized_key_helper(vectorized::Block* block);
void _find_in_hash_table(vectorized::AggregateDataPtr* places,
vectorized::ColumnRawPtrs& key_columns, size_t num_rows);

template <typename AggState, typename AggMethod>
void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
const vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
AggState>::value) {
auto old_keys_memory = agg_method.keys_memory_usage;
SCOPED_TIMER(_serialize_key_timer);
int64_t row_size = (int64_t)(agg_method.serialize_keys(key_columns, num_rows));
COUNTER_SET(_max_row_size_counter, std::max(_max_row_size_counter->value(), row_size));
state.set_serialized_keys(agg_method.keys.data());

_serialize_key_arena_memory_usage->add(agg_method.keys_memory_usage - old_keys_memory);
}
}
void _emplace_into_hash_table(vectorized::AggregateDataPtr* places,
vectorized::ColumnRawPtrs& key_columns, const size_t num_rows);
size_t _get_hash_table_size();
Expand Down Expand Up @@ -123,7 +107,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
->create_serialize_column();
}

context.init_once();
context.init_iterator();
const auto size = hash_table.size();
std::vector<KeyType> keys(size);
if (Base::_shared_state->values.size() < size) {
Expand Down Expand Up @@ -156,7 +140,8 @@ class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
key_columns[0]->insert_data(nullptr, 0);

// Here is no need to set `keys[num_rows]`, keep it as default value.
Base::_shared_state->values[num_rows] = hash_table.get_null_key_data();
Base::_shared_state->values[num_rows] =
hash_table.template get_null_key_data<vectorized::AggregateDataPtr>();
++num_rows;
}

Expand Down Expand Up @@ -304,7 +289,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {

vectorized::AggregatedDataVariants* _agg_data;
vectorized::Arena* _agg_arena_pool;
std::vector<size_t> _hash_values;

using vectorized_execute = std::function<Status(vectorized::Block* block)>;
using vectorized_update_memusage = std::function<void()>;
Expand All @@ -325,7 +309,7 @@ class BlockingAggSinkLocalState

BlockingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: AggSinkLocalState<AggDependency, BlockingAggSinkLocalState>(parent, state) {}
~BlockingAggSinkLocalState() = default;
~BlockingAggSinkLocalState() override = default;
};

template <typename LocalStateType = BlockingAggSinkLocalState>
Expand Down
Loading

0 comments on commit 1a0344d

Please sign in to comment.