From cecfad4d384ec44de292abacd60ae0c76af0ac63 Mon Sep 17 00:00:00 2001 From: silverbullet233 <3675229+silverbullet233@users.noreply.github.com> Date: Tue, 28 May 2024 16:38:47 +0800 Subject: [PATCH 1/3] more profile and metrics to measure runtiem bloom filter size Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com> --- be/src/exec/hash_joiner.cpp | 2 + be/src/exec/hash_joiner.h | 2 + .../hashjoin/hash_join_build_operator.cpp | 12 +++++ be/src/exprs/runtime_filter.h | 19 +++++-- be/src/runtime/runtime_filter_worker.cpp | 32 ++++++++---- be/src/runtime/runtime_filter_worker.h | 50 +++++++++++++++++++ be/src/util/system_metrics.cpp | 42 ++++++++++++++++ be/src/util/system_metrics.h | 6 +++ 8 files changed, 151 insertions(+), 14 deletions(-) diff --git a/be/src/exec/hash_joiner.cpp b/be/src/exec/hash_joiner.cpp index 3f21b62eada8e..6b5fb7e954049 100644 --- a/be/src/exec/hash_joiner.cpp +++ b/be/src/exec/hash_joiner.cpp @@ -50,6 +50,8 @@ void HashJoinBuildMetrics::prepare(RuntimeProfile* runtime_profile) { runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT); build_keys_per_bucket = ADD_COUNTER(runtime_profile, "BuildKeysPerBucket%", TUnit::UNIT); hash_table_memory_usage = ADD_COUNTER(runtime_profile, "HashTableMemoryUsage", TUnit::BYTES); + + partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES); } HashJoiner::HashJoiner(const HashJoinerParam& param) diff --git a/be/src/exec/hash_joiner.h b/be/src/exec/hash_joiner.h index a7af7079e9ad6..6a08c86ddb27e 100644 --- a/be/src/exec/hash_joiner.h +++ b/be/src/exec/hash_joiner.h @@ -156,6 +156,8 @@ struct HashJoinBuildMetrics { RuntimeProfile::Counter* build_keys_per_bucket = nullptr; RuntimeProfile::Counter* hash_table_memory_usage = nullptr; + RuntimeProfile::Counter* partial_runtime_bloom_filter_bytes = nullptr; + void prepare(RuntimeProfile* runtime_profile); }; diff --git a/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp b/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp index 3d9431b3ee96b..316f3ac4c9191 100644 --- a/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp +++ b/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp @@ -14,9 +14,11 @@ #include "exec/pipeline/hashjoin/hash_join_build_operator.h" +#include #include #include "exec/pipeline/query_context.h" +#include "exprs/runtime_filter_bank.h" #include "runtime/current_thread.h" #include "runtime/runtime_filter_worker.h" #include "util/race_detect.h" @@ -142,6 +144,16 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) { auto&& in_filters = _partial_rf_merger->get_total_in_filters(); auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters(); + { + size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull, + [](size_t total, RuntimeFilterBuildDescriptor* desc) -> size_t { + auto rf = desc->runtime_filter(); + total += (rf == nullptr ? 0 : rf->bf_alloc_size()); + return total; + }); + COUNTER_UPDATE(_join_builder->build_metrics().partial_runtime_bloom_filter_bytes, total_bf_bytes); + } + // publish runtime bloom-filters state->runtime_filter_port()->publish_runtime_filters(bloom_filters); // move runtime filters into RuntimeFilterHub. diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 306e1aabe0879..6a4cd90a412f3 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "column/chunk.h" #include "column/column_hash.h" #include "column/column_helper.h" @@ -182,6 +184,10 @@ class SimdBlockFilter { // we can use can_use() to check if this bloom filter can be used bool can_use() const { return _directory != nullptr; } + size_t get_alloc_size() const { + return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE)); + } + private: // The number of bits to set in a tiny Bloom filter block @@ -224,10 +230,6 @@ class SimdBlockFilter { // log2(number of bytes in a bucket): static constexpr int LOG_BUCKET_BYTE_SIZE = 5; - size_t get_alloc_size() const { - return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE)); - } - // Common: // log_num_buckets_ is the log (base 2) of the number of buckets in the directory: int _log_num_buckets = 0; @@ -332,6 +334,15 @@ class JoinRuntimeFilter { return _hash_partition_bf[0].can_use(); } + size_t bf_alloc_size() const { + if (_hash_partition_bf.empty()) { + return _bf.get_alloc_size(); + } + return std::accumulate( + _hash_partition_bf.begin(), _hash_partition_bf.end(), 0ull, + [](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); }); + } + // RuntimeFilter version // if the RuntimeFilter is updated, the version will be updated as well, // (usually used for TopN Filter) diff --git a/be/src/runtime/runtime_filter_worker.cpp b/be/src/runtime/runtime_filter_worker.cpp index bf47eb121d60a..d11bf0a5930a2 100644 --- a/be/src/runtime/runtime_filter_worker.cpp +++ b/be/src/runtime/runtime_filter_worker.cpp @@ -218,6 +218,7 @@ void RuntimeFilterPort::receive_shared_runtime_filter(int32_t filter_id, rf_desc->set_shared_runtime_filter(rf); } } + RuntimeFilterMerger::RuntimeFilterMerger(ExecEnv* env, const UniqueId& query_id, const TQueryOptions& query_options, bool is_pipeline) : _exec_env(env), _query_id(query_id), _query_options(query_options), _is_pipeline(is_pipeline) {} @@ -496,15 +497,6 @@ void RuntimeFilterMerger::_send_total_runtime_filter(int rf_version, int32_t fil pool->clear(); } -enum EventType { - RECEIVE_TOTAL_RF = 0, - CLOSE_QUERY = 1, - OPEN_QUERY = 2, - RECEIVE_PART_RF = 3, - SEND_PART_RF = 4, - SEND_BROADCAST_GRF = 5, -}; - struct RuntimeFilterWorkerEvent { public: RuntimeFilterWorkerEvent() = default; @@ -532,9 +524,14 @@ static_assert(std::is_move_assignable::value); RuntimeFilterWorker::RuntimeFilterWorker(ExecEnv* env) : _exec_env(env), _thread([this] { execute(); }) { Thread::set_thread_name(_thread, "runtime_filter"); + _metrics = new RuntimeFilterWorkerMetrics(); } -RuntimeFilterWorker::~RuntimeFilterWorker() = default; +RuntimeFilterWorker::~RuntimeFilterWorker() { + if (_metrics) { + delete _metrics; + } +} void RuntimeFilterWorker::close() { _queue.shutdown(); @@ -550,6 +547,7 @@ void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOpti ev.query_options = query_options; ev.create_rf_merger_request = params; ev.is_opened_by_pipeline = is_pipeline; + _metrics->update_event_nums(ev.type, 1); _queue.put(std::move(ev)); } @@ -558,6 +556,7 @@ void RuntimeFilterWorker::close_query(const TUniqueId& query_id) { RuntimeFilterWorkerEvent ev; ev.type = CLOSE_QUERY; ev.query_id = query_id; + _metrics->update_event_nums(ev.type, 1); _queue.put(std::move(ev)); } @@ -571,6 +570,8 @@ void RuntimeFilterWorker::send_part_runtime_filter(PTransmitRuntimeFilterParams& ev.transmit_via_http_min_size = rpc_http_min_size; ev.transmit_addrs = addrs; ev.transmit_rf_request = std::move(params); + _metrics->update_event_nums(ev.type, 1); + _metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size()); _queue.put(std::move(ev)); } @@ -584,6 +585,8 @@ void RuntimeFilterWorker::send_broadcast_runtime_filter(PTransmitRuntimeFilterPa ev.transmit_via_http_min_size = rpc_http_min_size; ev.destinations = destinations; ev.transmit_rf_request = std::move(params); + _metrics->update_event_nums(ev.type, 1); + _metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size()); _queue.put(std::move(ev)); } @@ -604,8 +607,11 @@ void RuntimeFilterWorker::receive_runtime_filter(const PTransmitRuntimeFilterPar ev.query_id.hi = params.query_id().hi(); ev.query_id.lo = params.query_id().lo(); ev.transmit_rf_request = params; + _metrics->update_event_nums(ev.type, 1); + _metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size()); _queue.put(std::move(ev)); } + // receive total runtime filter in pipeline engine. static inline void receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterParams& params, const std::shared_ptr& shared_rf) { @@ -876,6 +882,7 @@ void RuntimeFilterWorker::_deliver_part_runtime_filter(std::vector CpuInfo::num_cores() * 10, 10) << "runtime filter worker queue may be too large, size: " << _queue.get_size(); + _metrics->update_event_nums(ev.type, -1); switch (ev.type) { case RECEIVE_TOTAL_RF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); _receive_total_runtime_filter(ev.transmit_rf_request); break; } @@ -915,6 +924,7 @@ void RuntimeFilterWorker::execute() { } case RECEIVE_PART_RF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); auto it = _mergers.find(ev.query_id); if (it == _mergers.end()) { VLOG_QUERY << "receive part rf: rf merger not existed. query_id = " << ev.query_id; @@ -928,11 +938,13 @@ void RuntimeFilterWorker::execute() { } case SEND_PART_RF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); _deliver_part_runtime_filter(std::move(ev.transmit_addrs), std::move(ev.transmit_rf_request), ev.transmit_timeout_ms, ev.transmit_via_http_min_size); break; } case SEND_BROADCAST_GRF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); _process_send_broadcast_runtime_filter_event(std::move(ev.transmit_rf_request), std::move(ev.destinations), ev.transmit_timeout_ms, ev.transmit_via_http_min_size); break; diff --git a/be/src/runtime/runtime_filter_worker.h b/be/src/runtime/runtime_filter_worker.h index c0cbd0d338613..5dfd9839c1058 100644 --- a/be/src/runtime/runtime_filter_worker.h +++ b/be/src/runtime/runtime_filter_worker.h @@ -29,6 +29,7 @@ #include "gen_cpp/internal_service.pb.h" #include "util/blocking_queue.hpp" #include "util/ref_count_closure.h" +#include "util/system_metrics.h" #include "util/uid_util.h" namespace starrocks { @@ -113,6 +114,35 @@ class RuntimeFilterMerger { const bool _is_pipeline; }; +enum EventType { + RECEIVE_TOTAL_RF = 0, + CLOSE_QUERY = 1, + OPEN_QUERY = 2, + RECEIVE_PART_RF = 3, + SEND_PART_RF = 4, + SEND_BROADCAST_GRF = 5, + MAX_COUNT, +}; + +inline std::string EventTypeToString(EventType type) { + switch (type) { + case RECEIVE_TOTAL_RF: + return "RECEIVE_TOTAL_RF"; + case CLOSE_QUERY: + return "CLOSE_QUERY"; + case OPEN_QUERY: + return "OPEN_QUERY"; + case RECEIVE_PART_RF: + return "RECEIVE_PART_RF"; + case SEND_PART_RF: + return "SEND_PART_RF"; + case SEND_BROADCAST_GRF: + return "SEND_BROADCAST_GRF"; + default: + break; + } + __builtin_unreachable(); +} // RuntimeFilterWorker works in a separated thread, and does following jobs: // 1. deserialize runtime filters. // 2. merge runtime filters. @@ -124,6 +154,24 @@ class RuntimeFilterMerger { // - send partitioned RF(for hash join node) // - close a query(delete runtime filter merger) struct RuntimeFilterWorkerEvent; + +struct RuntimeFilterWorkerMetrics { + void update_event_nums(EventType event_type, int64_t delta) { + LOG(INFO) << "update event num, type: " << EventTypeToString(event_type) + << ", old: " << runtime_filter_bytes[event_type] << ", delta:" << delta; + event_nums[event_type] += delta; + } + + void update_rf_bytes(EventType event_type, int64_t delta) { + LOG(INFO) << "update rf bytes, type: " << EventTypeToString(event_type) + << ", old: " << runtime_filter_bytes[event_type] << ", delta:" << delta; + runtime_filter_bytes[event_type] += delta; + } + + std::array event_nums{}; + std::array runtime_filter_bytes{}; +}; + class RuntimeFilterWorker { public: RuntimeFilterWorker(ExecEnv* env); @@ -143,6 +191,7 @@ class RuntimeFilterWorker { int64_t rpc_http_min_size); size_t queue_size() const; + const RuntimeFilterWorkerMetrics* metrics() const { return _metrics; } private: void _receive_total_runtime_filter(PTransmitRuntimeFilterParams& params); @@ -166,6 +215,7 @@ class RuntimeFilterWorker { std::unordered_map _mergers; ExecEnv* _exec_env; std::thread _thread; + RuntimeFilterWorkerMetrics* _metrics = nullptr; }; }; // namespace starrocks diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index c50539d86d82d..68a8347eecd5b 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -43,6 +43,8 @@ #include "gutil/strings/split.h" // for string split #include "gutil/strtoint.h" // for atoi64 #include "jemalloc/jemalloc.h" +#include "runtime/runtime_filter_worker.h" +#include "util/metrics.h" namespace starrocks { @@ -118,6 +120,12 @@ class QueryCacheMetrics { METRIC_DEFINE_DOUBLE_GAUGE(query_cache_hit_ratio, MetricUnit::PERCENT); }; +class RuntimeFilterMetrics { +public: + METRIC_DEFINE_INT_GAUGE(runtime_filter_events_in_queue, MetricUnit::NOUNIT); + METRIC_DEFINE_INT_GAUGE(runtime_filter_bytes_in_queue, MetricUnit::BYTES); +}; + SystemMetrics::SystemMetrics() = default; SystemMetrics::~SystemMetrics() { @@ -135,6 +143,9 @@ SystemMetrics::~SystemMetrics() { if (_line_ptr != nullptr) { free(_line_ptr); } + for (auto& it : _runtime_filter_metrics) { + delete it.second; + } } void SystemMetrics::install(MetricRegistry* registry, const std::set& disk_devices, @@ -150,6 +161,7 @@ void SystemMetrics::install(MetricRegistry* registry, const std::setregister_metric("query_cache_hit_ratio", &_query_cache_metrics->query_cache_hit_ratio); } +void SystemMetrics::_install_runtime_filter_metrics(starrocks::MetricRegistry* registry) { + for (int i = 0; i < EventType::MAX_COUNT; i++) { + auto* metrics = new RuntimeFilterMetrics(); + const auto& type = EventTypeToString((EventType)i); +#define REGISTER_RUNTIME_FILTER_METRIC(name) \ + registry->register_metric(#name, MetricLabels().add("type", type), &metrics->name) + REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_events_in_queue); + REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_bytes_in_queue); + _runtime_filter_metrics.emplace(type, metrics); + } +} + +void SystemMetrics::_update_runtime_filter_metrics() { + auto* runtime_filter_worker = ExecEnv::GetInstance()->runtime_filter_worker(); + if (UNLIKELY(runtime_filter_worker == nullptr)) { + return; + } + const auto* metrics = runtime_filter_worker->metrics(); + for (int i = 0; i < EventType::MAX_COUNT; i++) { + const auto& event_name = EventTypeToString((EventType)i); + auto iter = _runtime_filter_metrics.find(event_name); + if (iter == _runtime_filter_metrics.end()) { + continue; + } + iter->second->runtime_filter_events_in_queue.set_value(metrics->event_nums[i]); + iter->second->runtime_filter_bytes_in_queue.set_value(metrics->runtime_filter_bytes[i]); + } +} + void SystemMetrics::_update_fd_metrics() { #ifdef BE_TEST FILE* fp = fopen(k_ut_fd_path, "r"); diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index 525721c22e210..facc6ba3d2b2e 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -30,6 +30,7 @@ class NetMetrics; class FileDescriptorMetrics; class SnmpMetrics; class QueryCacheMetrics; +class RuntimeFilterMetrics; class MemoryMetrics { public: @@ -134,6 +135,10 @@ class SystemMetrics { void _update_query_cache_metrics(); + void _install_runtime_filter_metrics(MetricRegistry* registry); + + void _update_runtime_filter_metrics(); + private: static const char* const _s_hook_name; @@ -143,6 +148,7 @@ class SystemMetrics { std::map _net_metrics; std::unique_ptr _fd_metrics; std::unique_ptr _query_cache_metrics; + std::map _runtime_filter_metrics; int _proc_net_dev_version = 0; std::unique_ptr _snmp_metrics; From e74dc5d458a7cf31801c18c3a5d8ff17416034b4 Mon Sep 17 00:00:00 2001 From: silverbullet233 <3675229+silverbullet233@users.noreply.github.com> Date: Tue, 28 May 2024 16:44:09 +0800 Subject: [PATCH 2/3] fix Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com> --- be/src/runtime/runtime_filter_worker.cpp | 1 - be/src/runtime/runtime_filter_worker.h | 4 ---- 2 files changed, 5 deletions(-) diff --git a/be/src/runtime/runtime_filter_worker.cpp b/be/src/runtime/runtime_filter_worker.cpp index d11bf0a5930a2..dd0bd2b403c86 100644 --- a/be/src/runtime/runtime_filter_worker.cpp +++ b/be/src/runtime/runtime_filter_worker.cpp @@ -882,7 +882,6 @@ void RuntimeFilterWorker::_deliver_part_runtime_filter(std::vector Date: Wed, 29 May 2024 14:21:51 +0800 Subject: [PATCH 3/3] fix format Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com> --- be/src/runtime/runtime_filter_worker.h | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/be/src/runtime/runtime_filter_worker.h b/be/src/runtime/runtime_filter_worker.h index 2f7f77e154b01..83db444657053 100644 --- a/be/src/runtime/runtime_filter_worker.h +++ b/be/src/runtime/runtime_filter_worker.h @@ -156,13 +156,9 @@ inline std::string EventTypeToString(EventType type) { struct RuntimeFilterWorkerEvent; struct RuntimeFilterWorkerMetrics { - void update_event_nums(EventType event_type, int64_t delta) { - event_nums[event_type] += delta; - } + void update_event_nums(EventType event_type, int64_t delta) { event_nums[event_type] += delta; } - void update_rf_bytes(EventType event_type, int64_t delta) { - runtime_filter_bytes[event_type] += delta; - } + void update_rf_bytes(EventType event_type, int64_t delta) { runtime_filter_bytes[event_type] += delta; } std::array event_nums{}; std::array runtime_filter_bytes{};