Skip to content

Commit

Permalink
[refactor](pipelineX) Simplify set operation (apache#25502)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 17, 2023
1 parent 652d6c5 commit 31a5e07
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 105 deletions.
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/set_probe_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
[&](auto&& arg) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
vectorized::HashTableProbeX<HashTableCtxType, is_intersect>
process_hashtable_ctx(local_state, probe_rows);
return process_hashtable_ctx.mark_data_in_hashtable(local_state, arg);
vectorized::HashTableProbe<HashTableCtxType, is_intersect>
process_hashtable_ctx(&local_state, probe_rows);
return process_hashtable_ctx.mark_data_in_hashtable(arg);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/set_probe_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class RuntimeState;
namespace vectorized {
class Block;
template <class HashTableContext, bool is_intersected>
struct HashTableProbeX;
struct HashTableProbe;
} // namespace vectorized

namespace pipeline {
Expand Down Expand Up @@ -81,11 +81,12 @@ class SetProbeSinkLocalState final : public PipelineXSinkLocalState<SetDependenc
: Base(parent, state) {}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
int64_t* valid_element_in_hash_tbl() { return &_shared_state->valid_element_in_hash_tbl; }

private:
friend class SetProbeSinkOperatorX<is_intersect>;
template <class HashTableContext, bool is_intersected>
friend struct vectorized::HashTableProbeX;
friend struct vectorized::HashTableProbe;

//record insert column id during probe
std::vector<uint16_t> _probe_column_inserted_id;
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
vectorized::HashTableBuildX<HashTableCtxType, is_intersect>
hash_table_build_process(rows, raw_ptrs, offset, state);
static_cast<void>(hash_table_build_process(local_state, arg));
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state, rows, raw_ptrs, offset, state);
static_cast<void>(hash_table_build_process(arg, local_state._arena));
} else {
LOG(FATAL) << "FATAL: uninited hash table";
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ExecNode;

namespace vectorized {
template <class HashTableContext, bool is_intersected>
struct HashTableBuildX;
struct HashTableBuild;
}

namespace pipeline {
Expand Down Expand Up @@ -74,10 +74,12 @@ class SetSinkLocalState final : public PipelineXSinkLocalState<SetDependency> {

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;

int64_t* mem_used() { return &_shared_state->mem_used; };

private:
friend class SetSinkOperatorX<is_intersect>;
template <class HashTableContext, bool is_intersected>
friend struct vectorized::HashTableBuildX;
friend struct vectorized::HashTableBuild;

RuntimeProfile::Counter* _build_timer; // time to build hash table
vectorized::MutableBlock _mutable_block;
Expand Down
55 changes: 6 additions & 49 deletions be/src/vec/common/hash_table/hash_table_set_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
namespace doris::vectorized {
template <class HashTableContext, bool is_intersect>
struct HashTableBuild {
HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs,
VSetOperationNode<is_intersect>* operation_node, uint8_t offset,
template <typename Parent>
HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset,
RuntimeState* state)
: _rows(rows),
: _mem_used(parent->mem_used()),
_rows(rows),
_offset(offset),
_build_raw_ptrs(build_raw_ptrs),
_operation_node(operation_node),
_state(state) {}

Status operator()(HashTableContext& hash_table_ctx, Arena& arena) {
Expand All @@ -39,7 +39,7 @@ struct HashTableBuild {

Defer defer {[&]() {
int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes();
_operation_node->_mem_used += bucket_bytes - old_bucket_bytes;
*_mem_used += bucket_bytes - old_bucket_bytes;
}};

KeyGetter key_getter(_build_raw_ptrs);
Expand All @@ -62,54 +62,11 @@ struct HashTableBuild {
}

private:
int64_t* _mem_used;
const int _rows;
const uint8_t _offset;
ColumnRawPtrs& _build_raw_ptrs;
VSetOperationNode<is_intersect>* _operation_node;
RuntimeState* _state;
};

template <class HashTableContext, bool is_intersect>
struct HashTableBuildX {
HashTableBuildX(int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, RuntimeState* state)
: _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), _state(state) {}

Status operator()(pipeline::SetSinkLocalState<is_intersect>& local_state,
HashTableContext& hash_table_ctx) {
using KeyGetter = typename HashTableContext::State;
using Mapped = typename HashTableContext::Mapped;
int64_t old_bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes();

Defer defer {[&]() {
int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes();
local_state._shared_state->mem_used += bucket_bytes - old_bucket_bytes;
}};

KeyGetter key_getter(_build_raw_ptrs);
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows);

size_t k = 0;
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashTableContext::try_presis_key(key, origin, local_state._arena);
ctor(key, Mapped {k, _offset});
};
auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; };

for (; k < _rows; ++k) {
if (k % CHECK_FRECUENCY == 0) {
RETURN_IF_CANCELLED(_state);
}
hash_table_ctx.lazy_emplace(key_getter, k, creator, creator_for_null_key);
}
return Status::OK();
}

private:
const int _rows;
const uint8_t _offset;
ColumnRawPtrs& _build_raw_ptrs;
RuntimeState* _state;
std::vector<size_t> _build_side_hash_values;
};

} // namespace doris::vectorized
52 changes: 7 additions & 45 deletions be/src/vec/common/hash_table/hash_table_set_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ namespace doris::vectorized {

template <class HashTableContext, bool is_intersected>
struct HashTableProbe {
HashTableProbe(VSetOperationNode<is_intersected>* operation_node, int probe_rows)
: _operation_node(operation_node),
template <typename Parent>
HashTableProbe(Parent* parent, int probe_rows)
: _valid_element_in_hash_tbl(parent->valid_element_in_hash_tbl()),
_probe_rows(probe_rows),
_probe_raw_ptrs(operation_node->_probe_columns) {}
_probe_raw_ptrs(parent->_probe_columns) {}

Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
using KeyGetter = typename HashTableContext::State;
Expand All @@ -43,49 +44,9 @@ struct HashTableProbe {
if (!(it->visited)) {
it->visited = true;
if constexpr (is_intersected) { //intersected
_operation_node->_valid_element_in_hash_tbl++;
(*_valid_element_in_hash_tbl)++;
} else {
_operation_node->_valid_element_in_hash_tbl--; //except
}
}
}
}
} else {
LOG(FATAL) << "Invalid RowRefListType!";
}
return Status::OK();
}

private:
VSetOperationNode<is_intersected>* _operation_node;
const size_t _probe_rows;
ColumnRawPtrs& _probe_raw_ptrs;
std::vector<StringRef> _probe_keys;
};

template <class HashTableContext, bool is_intersected>
struct HashTableProbeX {
HashTableProbeX(pipeline::SetProbeSinkLocalState<is_intersected>& local_state, int probe_rows)
: _probe_rows(probe_rows), _probe_raw_ptrs(local_state._probe_columns) {}

Status mark_data_in_hashtable(pipeline::SetProbeSinkLocalState<is_intersected>& local_state,
HashTableContext& hash_table_ctx) {
using KeyGetter = typename HashTableContext::State;

KeyGetter key_getter(_probe_raw_ptrs);
hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows);

if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) {
for (int probe_index = 0; probe_index < _probe_rows; probe_index++) {
auto find_result = hash_table_ctx.find(key_getter, probe_index);
if (find_result.is_found()) { //if found, marked visited
auto it = find_result.get_mapped().begin();
if (!(it->visited)) {
it->visited = true;
if constexpr (is_intersected) { //intersected
local_state._shared_state->valid_element_in_hash_tbl++;
} else {
local_state._shared_state->valid_element_in_hash_tbl--; //except
(*_valid_element_in_hash_tbl)--; //except
}
}
}
Expand All @@ -97,6 +58,7 @@ struct HashTableProbeX {
}

private:
int64_t* _valid_element_in_hash_tbl;
const size_t _probe_rows;
ColumnRawPtrs& _probe_raw_ptrs;
std::vector<StringRef> _probe_keys;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vset_operation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ Status VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
HashTableBuild<HashTableCtxType, is_intersect> hash_table_build_process(
rows, raw_ptrs, this, offset, state);
this, rows, raw_ptrs, offset, state);
st = hash_table_build_process(arg, _arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/vset_operation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class VSetOperationNode final : public ExecNode {

bool is_child_finished(int child_id) const;

int64_t* valid_element_in_hash_tbl() { return &_valid_element_in_hash_tbl; }
int64_t* mem_used() { return &_mem_used; };

private:
void _finalize_probe(int child_id);
//Todo: In build process of hashtable, It's same as join node.
Expand Down

0 comments on commit 31a5e07

Please sign in to comment.