From b63f47cb16b90d0620c9a4e5103cb0328984b217 Mon Sep 17 00:00:00 2001 From: eyes_on_me <3675229+silverbullet233@users.noreply.github.com> Date: Wed, 29 May 2024 16:44:14 +0800 Subject: [PATCH] [Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360) Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com> (cherry picked from commit 16ec9c16feb91ceb8c6fc882d55100a224e450da) # Conflicts: # be/src/exec/hash_joiner.cpp # be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp # be/src/runtime/runtime_filter_worker.cpp # be/src/runtime/runtime_filter_worker.h # be/src/util/system_metrics.cpp --- be/src/exec/hash_joiner.cpp | 7 +++ be/src/exec/hash_joiner.h | 2 + .../hashjoin/hash_join_build_operator.cpp | 37 +++++++++++++ be/src/exprs/runtime_filter.h | 19 +++++-- be/src/runtime/runtime_filter_worker.cpp | 41 ++++++++++---- be/src/runtime/runtime_filter_worker.h | 53 +++++++++++++++++++ be/src/util/system_metrics.cpp | 46 ++++++++++++++++ be/src/util/system_metrics.h | 6 +++ 8 files changed, 198 insertions(+), 13 deletions(-) diff --git a/be/src/exec/hash_joiner.cpp b/be/src/exec/hash_joiner.cpp index aed46e1964e0f..c7e1098331b71 100644 --- a/be/src/exec/hash_joiner.cpp +++ b/be/src/exec/hash_joiner.cpp @@ -54,6 +54,13 @@ void HashJoinBuildMetrics::prepare(RuntimeProfile* runtime_profile) { build_buckets_counter = ADD_COUNTER_SKIP_MERGE(runtime_profile, "BuildBuckets", TUnit::UNIT, TCounterMergeType::SKIP_FIRST_MERGE); runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT); +<<<<<<< HEAD +======= + build_keys_per_bucket = ADD_COUNTER(runtime_profile, "BuildKeysPerBucket%", TUnit::UNIT); + hash_table_memory_usage = ADD_COUNTER(runtime_profile, "HashTableMemoryUsage", TUnit::BYTES); + + partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES); +>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360)) } HashJoiner::HashJoiner(const HashJoinerParam& param) diff --git a/be/src/exec/hash_joiner.h b/be/src/exec/hash_joiner.h index fcae3465f0441..3df658eb160b9 100644 --- a/be/src/exec/hash_joiner.h +++ b/be/src/exec/hash_joiner.h @@ -157,6 +157,8 @@ struct HashJoinBuildMetrics { RuntimeProfile::Counter* build_buckets_counter = nullptr; RuntimeProfile::Counter* runtime_filter_num = nullptr; + RuntimeProfile::Counter* partial_runtime_bloom_filter_bytes = nullptr; + void prepare(RuntimeProfile* runtime_profile); }; diff --git a/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp b/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp index faf835346da17..a76155cd39411 100644 --- a/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp +++ b/be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp @@ -14,9 +14,11 @@ #include "exec/pipeline/hashjoin/hash_join_build_operator.h" +#include #include #include "exec/pipeline/query_context.h" +#include "exprs/runtime_filter_bank.h" #include "runtime/current_thread.h" #include "runtime/runtime_filter_worker.h" #include "util/race_detect.h" @@ -120,11 +122,46 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) { auto&& in_filters = _partial_rf_merger->get_total_in_filters(); auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters(); +<<<<<<< HEAD // publish runtime bloom-filters state->runtime_filter_port()->publish_runtime_filters(bloom_filters); // move runtime filters into RuntimeFilterHub. runtime_filter_hub()->set_collector(_plan_node_id, std::make_unique( std::move(in_filters), std::move(bloom_filters))); +======= + } else { + // add partial filters generated by this HashJoinBuildOperator to PartialRuntimeFilterMerger to merge into a + // total one. + bool all_build_merged = false; + { + SCOPED_TIMER(_join_builder->build_metrics().build_runtime_filter_timer); + auto status = _partial_rf_merger->add_partial_filters( + merger_index, ht_row_count, std::move(partial_in_filters), + std::move(partial_bloom_filter_build_params), std::move(partial_bloom_filters)); + ASSIGN_OR_RETURN(all_build_merged, status); + } + + if (all_build_merged) { + auto&& in_filters = _partial_rf_merger->get_total_in_filters(); + auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters(); + + { + size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull, + [](size_t total, RuntimeFilterBuildDescriptor* desc) -> size_t { + auto rf = desc->runtime_filter(); + total += (rf == nullptr ? 0 : rf->bf_alloc_size()); + return total; + }); + COUNTER_UPDATE(_join_builder->build_metrics().partial_runtime_bloom_filter_bytes, total_bf_bytes); + } + + // publish runtime bloom-filters + state->runtime_filter_port()->publish_runtime_filters(bloom_filters); + // move runtime filters into RuntimeFilterHub. + runtime_filter_hub()->set_collector(_plan_node_id, + std::make_unique(std::move(in_filters))); + } +>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360)) } _join_builder->enter_probe_phase(); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b5971f298fb56..aa102948891e0 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "column/chunk.h" #include "column/column_hash.h" #include "column/column_helper.h" @@ -162,6 +164,10 @@ class SimdBlockFilter { // we can use can_use() to check if this bloom filter can be used bool can_use() const { return _directory != nullptr; } + size_t get_alloc_size() const { + return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE)); + } + private: // The number of bits to set in a tiny Bloom filter block @@ -187,10 +193,6 @@ class SimdBlockFilter { // log2(number of bytes in a bucket): static constexpr int LOG_BUCKET_BYTE_SIZE = 5; - size_t get_alloc_size() const { - return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE)); - } - // Common: // log_num_buckets_ is the log (base 2) of the number of buckets in the directory: int _log_num_buckets = 0; @@ -292,6 +294,15 @@ class JoinRuntimeFilter { return _hash_partition_bf[0].can_use(); } + size_t bf_alloc_size() const { + if (_hash_partition_bf.empty()) { + return _bf.get_alloc_size(); + } + return std::accumulate( + _hash_partition_bf.begin(), _hash_partition_bf.end(), 0ull, + [](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); }); + } + // RuntimeFilter version // if the RuntimeFilter is updated, the version will be updated as well, // (usually used for TopN Filter) diff --git a/be/src/runtime/runtime_filter_worker.cpp b/be/src/runtime/runtime_filter_worker.cpp index c9b2f3db5ae17..7725e46bf75ac 100644 --- a/be/src/runtime/runtime_filter_worker.cpp +++ b/be/src/runtime/runtime_filter_worker.cpp @@ -191,6 +191,7 @@ void RuntimeFilterPort::receive_shared_runtime_filter(int32_t filter_id, rf_desc->set_shared_runtime_filter(rf); } } + RuntimeFilterMerger::RuntimeFilterMerger(ExecEnv* env, const UniqueId& query_id, const TQueryOptions& query_options, bool is_pipeline) : _exec_env(env), _query_id(query_id), _query_options(query_options), _is_pipeline(is_pipeline) {} @@ -469,15 +470,6 @@ void RuntimeFilterMerger::_send_total_runtime_filter(int rf_version, int32_t fil pool->clear(); } -enum EventType { - RECEIVE_TOTAL_RF = 0, - CLOSE_QUERY = 1, - OPEN_QUERY = 2, - RECEIVE_PART_RF = 3, - SEND_PART_RF = 4, - SEND_BROADCAST_GRF = 5, -}; - struct RuntimeFilterWorkerEvent { public: RuntimeFilterWorkerEvent() = default; @@ -504,9 +496,19 @@ static_assert(std::is_move_assignable::value); RuntimeFilterWorker::RuntimeFilterWorker(ExecEnv* env) : _exec_env(env), _thread([this] { execute(); }) { Thread::set_thread_name(_thread, "runtime_filter"); + _metrics = new RuntimeFilterWorkerMetrics(); } RuntimeFilterWorker::~RuntimeFilterWorker() { +<<<<<<< HEAD +======= + if (_metrics) { + delete _metrics; + } +} + +void RuntimeFilterWorker::close() { +>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360)) _queue.shutdown(); _thread.join(); } @@ -520,6 +522,7 @@ void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOpti ev.query_options = query_options; ev.create_rf_merger_request = params; ev.is_opened_by_pipeline = is_pipeline; + _metrics->update_event_nums(ev.type, 1); _queue.put(std::move(ev)); } @@ -528,6 +531,7 @@ void RuntimeFilterWorker::close_query(const TUniqueId& query_id) { RuntimeFilterWorkerEvent ev; ev.type = CLOSE_QUERY; ev.query_id = query_id; + _metrics->update_event_nums(ev.type, 1); _queue.put(std::move(ev)); } @@ -539,6 +543,8 @@ void RuntimeFilterWorker::send_part_runtime_filter(PTransmitRuntimeFilterParams& ev.transmit_timeout_ms = timeout_ms; ev.transmit_addrs = addrs; ev.transmit_rf_request = std::move(params); + _metrics->update_event_nums(ev.type, 1); + _metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size()); _queue.put(std::move(ev)); } @@ -551,6 +557,8 @@ void RuntimeFilterWorker::send_broadcast_runtime_filter(PTransmitRuntimeFilterPa ev.transmit_timeout_ms = timeout_ms; ev.destinations = destinations; ev.transmit_rf_request = std::move(params); + _metrics->update_event_nums(ev.type, 1); + _metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size()); _queue.put(std::move(ev)); } @@ -571,8 +579,11 @@ void RuntimeFilterWorker::receive_runtime_filter(const PTransmitRuntimeFilterPar ev.query_id.hi = params.query_id().hi(); ev.query_id.lo = params.query_id().lo(); ev.transmit_rf_request = params; + _metrics->update_event_nums(ev.type, 1); + _metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size()); _queue.put(std::move(ev)); } + // receive total runtime filter in pipeline engine. static inline Status receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterParams& params, const std::shared_ptr& shared_rf) { @@ -863,8 +874,17 @@ void RuntimeFilterWorker::execute() { if (!_queue.blocking_get(&ev)) { break; } +<<<<<<< HEAD +======= + + LOG_IF_EVERY_N(INFO, _queue.get_size() > CpuInfo::num_cores() * 10, 10) + << "runtime filter worker queue may be too large, size: " << _queue.get_size(); + + _metrics->update_event_nums(ev.type, -1); +>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360)) switch (ev.type) { case RECEIVE_TOTAL_RF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); _receive_total_runtime_filter(ev.transmit_rf_request); break; } @@ -894,6 +914,7 @@ void RuntimeFilterWorker::execute() { } case RECEIVE_PART_RF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); auto it = _mergers.find(ev.query_id); if (it == _mergers.end()) { VLOG_QUERY << "receive part rf: rf merger not existed. query_id = " << ev.query_id; @@ -907,11 +928,13 @@ void RuntimeFilterWorker::execute() { } case SEND_PART_RF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); _deliver_part_runtime_filter(std::move(ev.transmit_addrs), std::move(ev.transmit_rf_request), ev.transmit_timeout_ms); break; } case SEND_BROADCAST_GRF: { + _metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size()); _process_send_broadcast_runtime_filter_event(std::move(ev.transmit_rf_request), std::move(ev.destinations), ev.transmit_timeout_ms); break; diff --git a/be/src/runtime/runtime_filter_worker.h b/be/src/runtime/runtime_filter_worker.h index 95022672b6130..8fb39dfdf9fb2 100644 --- a/be/src/runtime/runtime_filter_worker.h +++ b/be/src/runtime/runtime_filter_worker.h @@ -29,6 +29,7 @@ #include "gen_cpp/internal_service.pb.h" #include "util/blocking_queue.hpp" #include "util/ref_count_closure.h" +#include "util/system_metrics.h" #include "util/uid_util.h" namespace starrocks { @@ -112,6 +113,35 @@ class RuntimeFilterMerger { const bool _is_pipeline; }; +enum EventType { + RECEIVE_TOTAL_RF = 0, + CLOSE_QUERY = 1, + OPEN_QUERY = 2, + RECEIVE_PART_RF = 3, + SEND_PART_RF = 4, + SEND_BROADCAST_GRF = 5, + MAX_COUNT, +}; + +inline std::string EventTypeToString(EventType type) { + switch (type) { + case RECEIVE_TOTAL_RF: + return "RECEIVE_TOTAL_RF"; + case CLOSE_QUERY: + return "CLOSE_QUERY"; + case OPEN_QUERY: + return "OPEN_QUERY"; + case RECEIVE_PART_RF: + return "RECEIVE_PART_RF"; + case SEND_PART_RF: + return "SEND_PART_RF"; + case SEND_BROADCAST_GRF: + return "SEND_BROADCAST_GRF"; + default: + break; + } + __builtin_unreachable(); +} // RuntimeFilterWorker works in a separated thread, and does following jobs: // 1. deserialize runtime filters. // 2. merge runtime filters. @@ -122,7 +152,21 @@ class RuntimeFilterMerger { // - receive total RF and send it to RuntimeFilterPort // - send partitioned RF(for hash join node) // - close a query(delete runtime filter merger) +<<<<<<< HEAD class RuntimeFilterWorkerEvent; +======= +struct RuntimeFilterWorkerEvent; + +struct RuntimeFilterWorkerMetrics { + void update_event_nums(EventType event_type, int64_t delta) { event_nums[event_type] += delta; } + + void update_rf_bytes(EventType event_type, int64_t delta) { runtime_filter_bytes[event_type] += delta; } + + std::array event_nums{}; + std::array runtime_filter_bytes{}; +}; + +>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360)) class RuntimeFilterWorker { public: RuntimeFilterWorker(ExecEnv* env); @@ -136,7 +180,15 @@ class RuntimeFilterWorker { void send_part_runtime_filter(PTransmitRuntimeFilterParams&& params, const std::vector& addrs, int timeout_ms); void send_broadcast_runtime_filter(PTransmitRuntimeFilterParams&& params, +<<<<<<< HEAD const std::vector& destinations, int timeout_ms); +======= + const std::vector& destinations, int timeout_ms, + int64_t rpc_http_min_size); + + size_t queue_size() const; + const RuntimeFilterWorkerMetrics* metrics() const { return _metrics; } +>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360)) private: void _receive_total_runtime_filter(PTransmitRuntimeFilterParams& params); @@ -158,6 +210,7 @@ class RuntimeFilterWorker { std::unordered_map _mergers; ExecEnv* _exec_env; std::thread _thread; + RuntimeFilterWorkerMetrics* _metrics = nullptr; }; }; // namespace starrocks diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index 256a1aedc6664..3e7aeff816d30 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -46,6 +46,12 @@ #include "column/column_pool.h" #include "gutil/strings/split.h" // for string split #include "gutil/strtoint.h" // for atoi64 +<<<<<<< HEAD +======= +#include "jemalloc/jemalloc.h" +#include "runtime/runtime_filter_worker.h" +#include "util/metrics.h" +>>>>>>> 16ec9c16fe ([Enhancement] add more profile and metrics to measure runtime bloom filter size (#46360)) namespace starrocks { @@ -121,6 +127,12 @@ class QueryCacheMetrics { METRIC_DEFINE_DOUBLE_GAUGE(query_cache_hit_ratio, MetricUnit::PERCENT); }; +class RuntimeFilterMetrics { +public: + METRIC_DEFINE_INT_GAUGE(runtime_filter_events_in_queue, MetricUnit::NOUNIT); + METRIC_DEFINE_INT_GAUGE(runtime_filter_bytes_in_queue, MetricUnit::BYTES); +}; + SystemMetrics::SystemMetrics() = default; SystemMetrics::~SystemMetrics() { @@ -138,6 +150,9 @@ SystemMetrics::~SystemMetrics() { if (_line_ptr != nullptr) { free(_line_ptr); } + for (auto& it : _runtime_filter_metrics) { + delete it.second; + } } void SystemMetrics::install(MetricRegistry* registry, const std::set& disk_devices, @@ -153,6 +168,7 @@ void SystemMetrics::install(MetricRegistry* registry, const std::setregister_metric("query_cache_hit_ratio", &_query_cache_metrics->query_cache_hit_ratio); } +void SystemMetrics::_install_runtime_filter_metrics(starrocks::MetricRegistry* registry) { + for (int i = 0; i < EventType::MAX_COUNT; i++) { + auto* metrics = new RuntimeFilterMetrics(); + const auto& type = EventTypeToString((EventType)i); +#define REGISTER_RUNTIME_FILTER_METRIC(name) \ + registry->register_metric(#name, MetricLabels().add("type", type), &metrics->name) + REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_events_in_queue); + REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_bytes_in_queue); + _runtime_filter_metrics.emplace(type, metrics); + } +} + +void SystemMetrics::_update_runtime_filter_metrics() { + auto* runtime_filter_worker = ExecEnv::GetInstance()->runtime_filter_worker(); + if (UNLIKELY(runtime_filter_worker == nullptr)) { + return; + } + const auto* metrics = runtime_filter_worker->metrics(); + for (int i = 0; i < EventType::MAX_COUNT; i++) { + const auto& event_name = EventTypeToString((EventType)i); + auto iter = _runtime_filter_metrics.find(event_name); + if (iter == _runtime_filter_metrics.end()) { + continue; + } + iter->second->runtime_filter_events_in_queue.set_value(metrics->event_nums[i]); + iter->second->runtime_filter_bytes_in_queue.set_value(metrics->runtime_filter_bytes[i]); + } +} + void SystemMetrics::_update_fd_metrics() { #ifdef BE_TEST FILE* fp = fopen(k_ut_fd_path, "r"); diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index 62eec51000c9e..4bb724ac5368a 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -30,6 +30,7 @@ class NetMetrics; class FileDescriptorMetrics; class SnmpMetrics; class QueryCacheMetrics; +class RuntimeFilterMetrics; class MemoryMetrics { public: @@ -142,6 +143,10 @@ class SystemMetrics { void _update_query_cache_metrics(); + void _install_runtime_filter_metrics(MetricRegistry* registry); + + void _update_runtime_filter_metrics(); + private: static const char* const _s_hook_name; @@ -151,6 +156,7 @@ class SystemMetrics { std::map _net_metrics; std::unique_ptr _fd_metrics; std::unique_ptr _query_cache_metrics; + std::map _runtime_filter_metrics; int _proc_net_dev_version = 0; std::unique_ptr _snmp_metrics;