Skip to content

Commit

Permalink
[Improvement](set) change set_operator's hash map to phmap (#43273)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. change set_operator's hash map to phmap.
2. optimize the logic of refresh hash table 

```sql
+-------+--------------+------+-------+---------+-------+
| Field | Type         | Null | Key   | Default | Extra |
+-------+--------------+------+-------+---------+-------+
| k1    | int          | Yes  | true  | NULL    |       |
| k2    | int          | No   | false | NULL    | NONE  |
| k3    | bigint       | Yes  | false | NULL    | NONE  |
| k4    | varchar(100) | Yes  | false | NULL    | NONE  |
+-------+--------------+------+-------+---------+-------+
```

```sql
select count(*) from (select * from a_table intersect select * from  b_table)t;
before:Time(ms)=37768|CpuTimeMS=61754|PeakMemoryBytes=129921088
after: Time(ms)=25218|CpuTimeMS=44571|PeakMemoryBytes=129902624
```
```sql
select count(*) from (select k1 from a_table intersect select k1 from  b_table)t;
before: Time(ms)=7964|CpuTimeMS=13431|PeakMemoryBytes=130062400
after:  Time(ms)=7459|CpuTimeMS=11976|PeakMemoryBytes=130091048
```
  • Loading branch information
BiteTheDDDDt authored Nov 11, 2024
1 parent 8e5a657 commit dae8f60
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 114 deletions.
74 changes: 28 additions & 46 deletions be/src/pipeline/common/set_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,32 @@

namespace doris {

template <class Key>
using SetFixedKeyHashTableContext =
vectorized::MethodKeysFixed<HashMap<Key, pipeline::RowRefListWithFlags, HashCRC32<Key>>>;
template <typename T>
using SetData = PHHashMap<T, RowRefListWithFlags, HashCRC32<T>>;

template <class T>
using SetPrimaryTypeHashTableContext =
vectorized::MethodOneNumber<T, HashMap<T, pipeline::RowRefListWithFlags, HashCRC32<T>>>;
template <typename T>
using SetFixedKeyHashTableContext = vectorized::MethodKeysFixed<SetData<T>>;

template <typename T>
using SetPrimaryTypeHashTableContext = vectorized::MethodOneNumber<T, SetData<T>>;

template <typename T>
using SetPrimaryTypeHashTableContextNullable = vectorized::MethodSingleNullableColumn<
vectorized::MethodOneNumber<T, vectorized::DataWithNullKey<SetData<T>>>>;

using SetSerializedHashTableContext =
vectorized::MethodSerialized<HashMap<StringRef, pipeline::RowRefListWithFlags>>;
vectorized::MethodSerialized<PHHashMap<StringRef, RowRefListWithFlags>>;
using SetMethodOneString =
vectorized::MethodStringNoCache<HashMap<StringRef, pipeline::RowRefListWithFlags>>;
vectorized::MethodStringNoCache<PHHashMap<StringRef, RowRefListWithFlags>>;

using SetHashTableVariants =
std::variant<std::monostate, SetSerializedHashTableContext, SetMethodOneString,
SetPrimaryTypeHashTableContextNullable<vectorized::UInt8>,
SetPrimaryTypeHashTableContextNullable<vectorized::UInt16>,
SetPrimaryTypeHashTableContextNullable<vectorized::UInt32>,
SetPrimaryTypeHashTableContextNullable<vectorized::UInt64>,
SetPrimaryTypeHashTableContextNullable<vectorized::UInt128>,
SetPrimaryTypeHashTableContextNullable<vectorized::UInt256>,
SetPrimaryTypeHashTableContext<vectorized::UInt8>,
SetPrimaryTypeHashTableContext<vectorized::UInt16>,
SetPrimaryTypeHashTableContext<vectorized::UInt32>,
Expand All @@ -51,61 +62,32 @@ using SetHashTableVariants =
SetFixedKeyHashTableContext<vectorized::UInt256>,
SetFixedKeyHashTableContext<vectorized::UInt136>>;

struct SetDataVariants {
SetHashTableVariants method_variant;

struct SetDataVariants
: public DataVariants<SetHashTableVariants, vectorized::MethodSingleNullableColumn,
vectorized::MethodOneNumber, vectorized::DataWithNullKey> {
void init(const std::vector<vectorized::DataTypePtr>& data_types, HashKeyType type) {
bool nullable = data_types.size() == 1 && data_types[0]->is_nullable();
switch (type) {
case HashKeyType::serialized:
method_variant.emplace<SetSerializedHashTableContext>();
break;
case HashKeyType::int8_key:
if (nullable) {
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
get_key_sizes(data_types));
} else {
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt8>>();
}
emplace_single<vectorized::UInt8, SetData<vectorized::UInt8>>(nullable);
break;
case HashKeyType::int16_key:
if (nullable) {
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
get_key_sizes(data_types));
} else {
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt16>>();
}
emplace_single<vectorized::UInt16, SetData<vectorized::UInt16>>(nullable);
break;
case HashKeyType::int32_key:
if (nullable) {
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
get_key_sizes(data_types));
} else {
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt32>>();
}
emplace_single<vectorized::UInt32, SetData<vectorized::UInt32>>(nullable);
break;
case HashKeyType::int64_key:
if (nullable) {
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt128>>(
get_key_sizes(data_types));
} else {
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt64>>();
}
emplace_single<vectorized::UInt64, SetData<vectorized::UInt64>>(nullable);
break;
case HashKeyType::int128_key:
if (nullable) {
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt136>>(
get_key_sizes(data_types));
} else {
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt128>>();
}
emplace_single<vectorized::UInt128, SetData<vectorized::UInt128>>(nullable);
break;
case HashKeyType::int256_key:
if (nullable) {
method_variant.emplace<SetSerializedHashTableContext>();
} else {
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt256>>();
}
emplace_single<vectorized::UInt256, SetData<vectorized::UInt256>>(nullable);
break;
case HashKeyType::string_key:
method_variant.emplace<SetMethodOneString>();
Expand Down
18 changes: 6 additions & 12 deletions be/src/pipeline/exec/join/join_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "vec/common/columns_hashing.h"
#include "vec/core/block.h"

namespace doris::pipeline {
namespace doris {
/**
* Now we have different kinds of RowRef for join operation. Overall, RowRef is the base class and
* the class inheritance is below:
Expand Down Expand Up @@ -129,12 +129,10 @@ struct RowRefList : RowRef {
RowRefList() = default;
RowRefList(size_t row_num_) : RowRef(row_num_) {}

ForwardIterator<RowRefList> begin() { return ForwardIterator<RowRefList>(this); }
ForwardIterator<RowRefList> begin() { return {this}; }

/// insert element after current one
void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
next.emplace_back(std::move(row_ref));
}
void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); }

void clear() { next.clear(); }

Expand All @@ -149,9 +147,7 @@ struct RowRefListWithFlag : RowRef {
RowRefListWithFlag() = default;
RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {}

ForwardIterator<RowRefListWithFlag> const begin() {
return ForwardIterator<RowRefListWithFlag>(this);
}
ForwardIterator<RowRefListWithFlag> begin() { return {this}; }

/// insert element after current one
void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); }
Expand All @@ -171,9 +167,7 @@ struct RowRefListWithFlags : RowRefWithFlag {
RowRefListWithFlags() = default;
RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {}

ForwardIterator<RowRefListWithFlags> const begin() {
return ForwardIterator<RowRefListWithFlags>(this);
}
ForwardIterator<RowRefListWithFlags> begin() { return {this}; }

/// insert element after current one
void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); }
Expand All @@ -185,4 +179,4 @@ struct RowRefListWithFlags : RowRefWithFlag {
std::vector<RowRefType> next;
};

} // namespace doris::pipeline
} // namespace doris
74 changes: 40 additions & 34 deletions be/src/pipeline/exec/set_probe_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,46 +210,52 @@ void SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
auto tmp_hash_table =
std::make_shared<typename HashTableCtxType::HashMapType>();
bool is_need_shrink =
arg.hash_table->should_be_shrink(valid_element_in_hash_tbl);
if (is_intersect || is_need_shrink) {
tmp_hash_table->init_buf_size(size_t(
valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1));
}

arg.init_iterator();
auto& iter = arg.iterator;
auto iter_end = arg.hash_table->end();
std::visit(
[&](auto is_need_shrink_const) {
while (iter != iter_end) {
auto& mapped = iter->get_second();
auto it = mapped.begin();

if constexpr (is_intersect) { //intersected
if (it->visited) {
it->visited = false;
tmp_hash_table->insert(iter->get_value());
}
++iter;
} else { //except
if constexpr (is_need_shrink_const) {
if (!it->visited) {
tmp_hash_table->insert(iter->get_value());
}
}
++iter;
}
}
},
vectorized::make_bool_variant(is_need_shrink));

arg.reset();
if (is_intersect || is_need_shrink) {
constexpr double need_shrink_ratio = 0.25;
bool is_need_shrink =
is_intersect
? (valid_element_in_hash_tbl <
arg.hash_table
->size()) // When intersect, shrink as long as the element decreases
: (valid_element_in_hash_tbl <
arg.hash_table->size() *
need_shrink_ratio); // When except, element decreases need to within the 'need_shrink_ratio' before shrinking

if (is_need_shrink) {
auto tmp_hash_table =
std::make_shared<typename HashTableCtxType::HashMapType>();
tmp_hash_table->reserve(
local_state._shared_state->valid_element_in_hash_tbl);
while (iter != iter_end) {
auto& mapped = iter->get_second();
auto it = mapped.begin();

if constexpr (is_intersect) {
if (it->visited) {
it->visited = false;
tmp_hash_table->insert(iter->get_first(), iter->get_second());
}
} else {
if (!it->visited) {
tmp_hash_table->insert(iter->get_first(), iter->get_second());
}
}
++iter;
}
arg.hash_table = std::move(tmp_hash_table);
} else if (is_intersect) {
while (iter != iter_end) {
auto& mapped = iter->get_second();
auto it = mapped.begin();
it->visited = false;
++iter;
}
}

arg.reset();
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
Expand Down
17 changes: 14 additions & 3 deletions be/src/pipeline/exec/set_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "set_source_operator.h"

#include <memory>
#include <type_traits>

#include "common/status.h"
#include "pipeline/exec/operator.h"
Expand Down Expand Up @@ -124,11 +125,9 @@ Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
vectorized::Block* output_block, const int batch_size, bool* eos) {
size_t left_col_len = local_state._left_table_data_types.size();
hash_table_ctx.init_iterator();
auto& iter = hash_table_ctx.iterator;
auto block_size = 0;

for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) {
auto& value = iter->get_second();
auto add_result = [&local_state, &block_size, this](auto value) {
auto it = value.begin();
if constexpr (is_intersect) {
if (it->visited) { //intersected: have done probe, so visited values it's the result
Expand All @@ -139,9 +138,21 @@ Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
_add_result_columns(local_state, value, block_size);
}
}
};

auto& iter = hash_table_ctx.iterator;
for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) {
add_result(iter->get_second());
}

*eos = iter == hash_table_ctx.hash_table->end();
if (*eos && hash_table_ctx.hash_table->has_null_key_data()) {
auto value = hash_table_ctx.hash_table->template get_null_key_data<RowRefListWithFlags>();
if constexpr (std::is_same_v<RowRefListWithFlags, std::decay_t<decltype(value)>>) {
add_result(value);
}
}

if (!output_block->mem_reuse()) {
for (int i = 0; i < left_col_len; ++i) {
output_block->insert(
Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/common/hash_table/hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@ using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower,
template <typename Key, typename Hash = DefaultHash<Key>>
using JoinHashMap = JoinHashTable<Key, Hash>;

template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator>
using HashMapWithSavedHash =
HashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash, Grower, Allocator>;

template <typename Key, typename Mapped, typename Hash, size_t initial_size_degree>
using HashMapWithStackMemory = HashMapTable<
Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash,
Expand Down
7 changes: 1 addition & 6 deletions be/src/vec/common/hash_table/hash_map_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
#include "vec/core/types.h"
#include "vec/utils/util.hpp"

namespace doris::pipeline {
struct RowRefListWithFlags;
}

namespace doris::vectorized {

constexpr auto BITSIZE = 8;
Expand Down Expand Up @@ -587,8 +583,7 @@ struct DataWithNullKey : public Base {

private:
bool has_null_key = false;
// null_key_data store AggregateDataPtr on agg node, store PartitionBlocks on partition sort node.
void* null_key_data = nullptr;
Base::Value null_key_data;
};

/// Single low cardinality column.
Expand Down
11 changes: 4 additions & 7 deletions be/src/vec/common/hash_table/ph_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PHHashMap : private boost::noncopyable {

using key_type = Key;
using mapped_type = Mapped;
using Value = Mapped;
using value_type = std::pair<const Key, Mapped>;

using LookupResult = std::pair<const Key, Mapped>*;
Expand Down Expand Up @@ -154,10 +155,8 @@ class PHHashMap : private boost::noncopyable {
[&](const auto& ctor) { f(ctor, key, key); });
}

void ALWAYS_INLINE insert(const Key& key, size_t hash_value, const Mapped& value) {
auto it = &*_hash_map.lazy_emplace_with_hash(key, hash_value,
[&](const auto& ctor) { ctor(key, value); });
it->second = value;
void ALWAYS_INLINE insert(const Key& key, const Mapped& value) {
_hash_map.lazy_emplace(key, [&](const auto& ctor) { ctor(key, value); });
}

template <typename KeyHolder>
Expand Down Expand Up @@ -190,8 +189,6 @@ class PHHashMap : private boost::noncopyable {
return capacity * sizeof(typename HashMapImpl::slot_type);
}

size_t get_buffer_size_in_cells() const { return _hash_map.capacity(); }

bool add_elem_size_overflow(size_t row) const {
const auto capacity = _hash_map.capacity();
// phmap use 7/8th as maximum load factor.
Expand All @@ -209,7 +206,7 @@ class PHHashMap : private boost::noncopyable {

void clear_and_shrink() { _hash_map.clear(); }

void expanse_for_add_elem(size_t num_elem) { _hash_map.reserve(num_elem); }
void reserve(size_t num_elem) { _hash_map.reserve(num_elem); }

private:
HashMapImpl _hash_map;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/common/hash_table/ph_hash_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class PHHashSet : private boost::noncopyable {
using key_type = Key;
using mapped_type = void;
using value_type = void;
using Value = void*;

using LookupResult = void*;

Expand Down Expand Up @@ -104,7 +105,7 @@ class PHHashSet : private boost::noncopyable {

void clear_and_shrink() { _hash_set.clear(); }

void expanse_for_add_elem(size_t num_elem) { _hash_set.reserve(num_elem); }
void reserve(size_t num_elem) { _hash_set.reserve(num_elem); }

private:
HashSetImpl _hash_set;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/common/hash_table/string_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class StringHashMap : public StringHashTable<StringHashMapSubMaps<TMapped, Alloc
using Base = StringHashTable<StringHashMapSubMaps<TMapped, Allocator>>;
using Self = StringHashMap;
using LookupResult = typename Base::LookupResult;
using Value = TMapped;

using Base::Base;

Expand Down

0 comments on commit dae8f60

Please sign in to comment.