Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add more profile and metrics to measure runtime bloom filter size (backport #46360) #46407

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ void HashJoinBuildMetrics::prepare(RuntimeProfile* runtime_profile) {
build_buckets_counter =
ADD_COUNTER_SKIP_MERGE(runtime_profile, "BuildBuckets", TUnit::UNIT, TCounterMergeType::SKIP_FIRST_MERGE);
runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT);
<<<<<<< HEAD
=======
build_keys_per_bucket = ADD_COUNTER(runtime_profile, "BuildKeysPerBucket%", TUnit::UNIT);
hash_table_memory_usage = ADD_COUNTER(runtime_profile, "HashTableMemoryUsage", TUnit::BYTES);

partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES);
>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))
}

HashJoiner::HashJoiner(const HashJoinerParam& param)
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ struct HashJoinBuildMetrics {
RuntimeProfile::Counter* build_buckets_counter = nullptr;
RuntimeProfile::Counter* runtime_filter_num = nullptr;

RuntimeProfile::Counter* partial_runtime_bloom_filter_bytes = nullptr;

void prepare(RuntimeProfile* runtime_profile);
};

Expand Down
37 changes: 37 additions & 0 deletions be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

#include "exec/pipeline/hashjoin/hash_join_build_operator.h"

#include <numeric>
#include <utility>

#include "exec/pipeline/query_context.h"
#include "exprs/runtime_filter_bank.h"
#include "runtime/current_thread.h"
#include "runtime/runtime_filter_worker.h"
#include "util/race_detect.h"
Expand Down Expand Up @@ -120,11 +122,46 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

<<<<<<< HEAD
// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
runtime_filter_hub()->set_collector(_plan_node_id, std::make_unique<RuntimeFilterCollector>(
std::move(in_filters), std::move(bloom_filters)));
=======
} else {
// add partial filters generated by this HashJoinBuildOperator to PartialRuntimeFilterMerger to merge into a
// total one.
bool all_build_merged = false;
{
SCOPED_TIMER(_join_builder->build_metrics().build_runtime_filter_timer);
auto status = _partial_rf_merger->add_partial_filters(
merger_index, ht_row_count, std::move(partial_in_filters),
std::move(partial_bloom_filter_build_params), std::move(partial_bloom_filters));
ASSIGN_OR_RETURN(all_build_merged, status);
}

if (all_build_merged) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

{
size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull,
[](size_t total, RuntimeFilterBuildDescriptor* desc) -> size_t {
auto rf = desc->runtime_filter();
total += (rf == nullptr ? 0 : rf->bf_alloc_size());
return total;
});
COUNTER_UPDATE(_join_builder->build_metrics().partial_runtime_bloom_filter_bytes, total_bf_bytes);
}

// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
runtime_filter_hub()->set_collector(_plan_node_id,
std::make_unique<RuntimeFilterCollector>(std::move(in_filters)));
}
>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))
}

_join_builder->enter_probe_phase();
Expand Down
19 changes: 15 additions & 4 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <numeric>

#include "column/chunk.h"
#include "column/column_hash.h"
#include "column/column_helper.h"
Expand Down Expand Up @@ -162,6 +164,10 @@ class SimdBlockFilter {
// we can use can_use() to check if this bloom filter can be used
bool can_use() const { return _directory != nullptr; }

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

private:
// The number of bits to set in a tiny Bloom filter block

Expand All @@ -187,10 +193,6 @@ class SimdBlockFilter {
// log2(number of bytes in a bucket):
static constexpr int LOG_BUCKET_BYTE_SIZE = 5;

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

// Common:
// log_num_buckets_ is the log (base 2) of the number of buckets in the directory:
int _log_num_buckets = 0;
Expand Down Expand Up @@ -292,6 +294,15 @@ class JoinRuntimeFilter {
return _hash_partition_bf[0].can_use();
}

size_t bf_alloc_size() const {
if (_hash_partition_bf.empty()) {
return _bf.get_alloc_size();
}
return std::accumulate(
_hash_partition_bf.begin(), _hash_partition_bf.end(), 0ull,
[](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); });
}

// RuntimeFilter version
// if the RuntimeFilter is updated, the version will be updated as well,
// (usually used for TopN Filter)
Expand Down
41 changes: 32 additions & 9 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ void RuntimeFilterPort::receive_shared_runtime_filter(int32_t filter_id,
rf_desc->set_shared_runtime_filter(rf);
}
}

RuntimeFilterMerger::RuntimeFilterMerger(ExecEnv* env, const UniqueId& query_id, const TQueryOptions& query_options,
bool is_pipeline)
: _exec_env(env), _query_id(query_id), _query_options(query_options), _is_pipeline(is_pipeline) {}
Expand Down Expand Up @@ -469,15 +470,6 @@ void RuntimeFilterMerger::_send_total_runtime_filter(int rf_version, int32_t fil
pool->clear();
}

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
};

struct RuntimeFilterWorkerEvent {
public:
RuntimeFilterWorkerEvent() = default;
Expand All @@ -504,9 +496,19 @@ static_assert(std::is_move_assignable<RuntimeFilterWorkerEvent>::value);

RuntimeFilterWorker::RuntimeFilterWorker(ExecEnv* env) : _exec_env(env), _thread([this] { execute(); }) {
Thread::set_thread_name(_thread, "runtime_filter");
_metrics = new RuntimeFilterWorkerMetrics();
}

RuntimeFilterWorker::~RuntimeFilterWorker() {
<<<<<<< HEAD
=======
if (_metrics) {
delete _metrics;
}
}

void RuntimeFilterWorker::close() {
>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))
_queue.shutdown();
_thread.join();
}
Expand All @@ -520,6 +522,7 @@ void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOpti
ev.query_options = query_options;
ev.create_rf_merger_request = params;
ev.is_opened_by_pipeline = is_pipeline;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -528,6 +531,7 @@ void RuntimeFilterWorker::close_query(const TUniqueId& query_id) {
RuntimeFilterWorkerEvent ev;
ev.type = CLOSE_QUERY;
ev.query_id = query_id;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -539,6 +543,8 @@ void RuntimeFilterWorker::send_part_runtime_filter(PTransmitRuntimeFilterParams&
ev.transmit_timeout_ms = timeout_ms;
ev.transmit_addrs = addrs;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -551,6 +557,8 @@ void RuntimeFilterWorker::send_broadcast_runtime_filter(PTransmitRuntimeFilterPa
ev.transmit_timeout_ms = timeout_ms;
ev.destinations = destinations;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -571,8 +579,11 @@ void RuntimeFilterWorker::receive_runtime_filter(const PTransmitRuntimeFilterPar
ev.query_id.hi = params.query_id().hi();
ev.query_id.lo = params.query_id().lo();
ev.transmit_rf_request = params;
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

// receive total runtime filter in pipeline engine.
static inline Status receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterParams& params,
const std::shared_ptr<JoinRuntimeFilter>& shared_rf) {
Expand Down Expand Up @@ -863,8 +874,17 @@ void RuntimeFilterWorker::execute() {
if (!_queue.blocking_get(&ev)) {
break;
}
<<<<<<< HEAD
=======

LOG_IF_EVERY_N(INFO, _queue.get_size() > CpuInfo::num_cores() * 10, 10)
<< "runtime filter worker queue may be too large, size: " << _queue.get_size();

_metrics->update_event_nums(ev.type, -1);
>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))
switch (ev.type) {
case RECEIVE_TOTAL_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_receive_total_runtime_filter(ev.transmit_rf_request);
break;
}
Expand Down Expand Up @@ -894,6 +914,7 @@ void RuntimeFilterWorker::execute() {
}

case RECEIVE_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
auto it = _mergers.find(ev.query_id);
if (it == _mergers.end()) {
VLOG_QUERY << "receive part rf: rf merger not existed. query_id = " << ev.query_id;
Expand All @@ -907,11 +928,13 @@ void RuntimeFilterWorker::execute() {
}

case SEND_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_deliver_part_runtime_filter(std::move(ev.transmit_addrs), std::move(ev.transmit_rf_request),
ev.transmit_timeout_ms);
break;
}
case SEND_BROADCAST_GRF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_process_send_broadcast_runtime_filter_event(std::move(ev.transmit_rf_request), std::move(ev.destinations),
ev.transmit_timeout_ms);
break;
Expand Down
53 changes: 53 additions & 0 deletions be/src/runtime/runtime_filter_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "util/blocking_queue.hpp"
#include "util/ref_count_closure.h"
#include "util/system_metrics.h"
#include "util/uid_util.h"
namespace starrocks {

Expand Down Expand Up @@ -112,6 +113,35 @@ class RuntimeFilterMerger {
const bool _is_pipeline;
};

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
MAX_COUNT,
};

inline std::string EventTypeToString(EventType type) {
switch (type) {
case RECEIVE_TOTAL_RF:
return "RECEIVE_TOTAL_RF";
case CLOSE_QUERY:
return "CLOSE_QUERY";
case OPEN_QUERY:
return "OPEN_QUERY";
case RECEIVE_PART_RF:
return "RECEIVE_PART_RF";
case SEND_PART_RF:
return "SEND_PART_RF";
case SEND_BROADCAST_GRF:
return "SEND_BROADCAST_GRF";
default:
break;
}
__builtin_unreachable();
}
// RuntimeFilterWorker works in a separated thread, and does following jobs:
// 1. deserialize runtime filters.
// 2. merge runtime filters.
Expand All @@ -122,7 +152,21 @@ class RuntimeFilterMerger {
// - receive total RF and send it to RuntimeFilterPort
// - send partitioned RF(for hash join node)
// - close a query(delete runtime filter merger)
<<<<<<< HEAD
class RuntimeFilterWorkerEvent;
=======
struct RuntimeFilterWorkerEvent;

struct RuntimeFilterWorkerMetrics {
void update_event_nums(EventType event_type, int64_t delta) { event_nums[event_type] += delta; }

void update_rf_bytes(EventType event_type, int64_t delta) { runtime_filter_bytes[event_type] += delta; }

std::array<std::atomic_int64_t, EventType::MAX_COUNT> event_nums{};
std::array<std::atomic_int64_t, EventType::MAX_COUNT> runtime_filter_bytes{};
};

>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))
class RuntimeFilterWorker {
public:
RuntimeFilterWorker(ExecEnv* env);
Expand All @@ -136,7 +180,15 @@ class RuntimeFilterWorker {
void send_part_runtime_filter(PTransmitRuntimeFilterParams&& params,
const std::vector<starrocks::TNetworkAddress>& addrs, int timeout_ms);
void send_broadcast_runtime_filter(PTransmitRuntimeFilterParams&& params,
<<<<<<< HEAD
const std::vector<TRuntimeFilterDestination>& destinations, int timeout_ms);
=======
const std::vector<TRuntimeFilterDestination>& destinations, int timeout_ms,
int64_t rpc_http_min_size);

size_t queue_size() const;
const RuntimeFilterWorkerMetrics* metrics() const { return _metrics; }
>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360))

private:
void _receive_total_runtime_filter(PTransmitRuntimeFilterParams& params);
Expand All @@ -158,6 +210,7 @@ class RuntimeFilterWorker {
std::unordered_map<TUniqueId, RuntimeFilterMerger> _mergers;
ExecEnv* _exec_env;
std::thread _thread;
RuntimeFilterWorkerMetrics* _metrics = nullptr;
};

}; // namespace starrocks
Loading
Loading