Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add more profile and metrics to measure runtime bloom filter size (backport #46360) #46406

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
592 changes: 592 additions & 0 deletions be/src/exec/hash_joiner.cpp

Large diffs are not rendered by default.

460 changes: 460 additions & 0 deletions be/src/exec/hash_joiner.h

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

#include "exec/pipeline/hashjoin/hash_join_build_operator.h"

<<<<<<< HEAD
=======
#include <numeric>
#include <utility>

>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))
#include "exec/pipeline/query_context.h"
#include "exprs/runtime_filter_bank.h"
#include "runtime/current_thread.h"
#include "runtime/runtime_filter_worker.h"
namespace starrocks::pipeline {
Expand Down Expand Up @@ -88,6 +95,7 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

<<<<<<< HEAD
// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
Expand All @@ -98,6 +106,39 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
TRY_CATCH_ALLOC_SCOPE_START()
for (auto& read_only_join_prober : _read_only_join_probers) {
read_only_join_prober->reference_hash_table(_join_builder.get());
=======
} else {
// add partial filters generated by this HashJoinBuildOperator to PartialRuntimeFilterMerger to merge into a
// total one.
bool all_build_merged = false;
{
SCOPED_TIMER(_join_builder->build_metrics().build_runtime_filter_timer);
auto status = _partial_rf_merger->add_partial_filters(
merger_index, ht_row_count, std::move(partial_in_filters),
std::move(partial_bloom_filter_build_params), std::move(partial_bloom_filters));
ASSIGN_OR_RETURN(all_build_merged, status);
}

if (all_build_merged) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

{
size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull,
[](size_t total, RuntimeFilterBuildDescriptor* desc) -> size_t {
auto rf = desc->runtime_filter();
total += (rf == nullptr ? 0 : rf->bf_alloc_size());
return total;
});
COUNTER_UPDATE(_join_builder->build_metrics().partial_runtime_bloom_filter_bytes, total_bf_bytes);
}

// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
runtime_filter_hub()->set_collector(_plan_node_id,
std::make_unique<RuntimeFilterCollector>(std::move(in_filters)));
>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))
}
TRY_CATCH_ALLOC_SCOPE_END()
}
Expand Down
Loading
Loading