Skip to content

Commit

Permalink
[Enhancement] add more profile and metrics to measure runtime bloom f…
Browse files Browse the repository at this point in the history
…ilter size (StarRocks#46360)

Signed-off-by: silverbullet233 <[email protected]>
  • Loading branch information
silverbullet233 committed May 30, 2024
1 parent 950b663 commit 99ca170
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 15 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ void HashJoinBuildMetrics::prepare(RuntimeProfile* runtime_profile) {
build_buckets_counter = ADD_COUNTER(runtime_profile, "BuildBuckets", TUnit::UNIT);
runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT);
hash_table_memory_usage = ADD_COUNTER(runtime_profile, "HashTableMemoryUsage", TUnit::BYTES);

partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES);
}

HashJoiner::HashJoiner(const HashJoinerParam& param)
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ struct HashJoinBuildMetrics {
RuntimeProfile::Counter* runtime_filter_num = nullptr;
RuntimeProfile::Counter* hash_table_memory_usage = nullptr;

RuntimeProfile::Counter* partial_runtime_bloom_filter_bytes = nullptr;

void prepare(RuntimeProfile* runtime_profile);
};

Expand Down
11 changes: 11 additions & 0 deletions be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

#include "exec/pipeline/hashjoin/hash_join_build_operator.h"

#include <numeric>
#include <utility>

#include "exec/pipeline/query_context.h"
#include "exprs/runtime_filter_bank.h"
#include "runtime/current_thread.h"
#include "runtime/runtime_filter_worker.h"
#include "util/race_detect.h"
Expand Down Expand Up @@ -120,6 +122,15 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

{
size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull,
[](size_t total, RuntimeFilterBuildDescriptor* desc) -> size_t {
auto rf = desc->runtime_filter();
total += (rf == nullptr ? 0 : rf->bf_alloc_size());
return total;
});
COUNTER_UPDATE(_join_builder->build_metrics().partial_runtime_bloom_filter_bytes, total_bf_bytes);
}
// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
Expand Down
19 changes: 15 additions & 4 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <numeric>

#include "column/chunk.h"
#include "column/column_hash.h"
#include "column/column_helper.h"
Expand Down Expand Up @@ -180,6 +182,10 @@ class SimdBlockFilter {
// we can use can_use() to check if this bloom filter can be used
bool can_use() const { return _directory != nullptr; }

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

private:
// The number of bits to set in a tiny Bloom filter block

Expand Down Expand Up @@ -222,10 +228,6 @@ class SimdBlockFilter {
// log2(number of bytes in a bucket):
static constexpr int LOG_BUCKET_BYTE_SIZE = 5;

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

// Common:
// log_num_buckets_ is the log (base 2) of the number of buckets in the directory:
int _log_num_buckets = 0;
Expand Down Expand Up @@ -327,6 +329,15 @@ class JoinRuntimeFilter {
return _hash_partition_bf[0].can_use();
}

size_t bf_alloc_size() const {
if (_hash_partition_bf.empty()) {
return _bf.get_alloc_size();
}
return std::accumulate(
_hash_partition_bf.begin(), _hash_partition_bf.end(), 0ull,
[](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); });
}

// RuntimeFilter version
// if the RuntimeFilter is updated, the version will be updated as well,
// (usually used for TopN Filter)
Expand Down
35 changes: 25 additions & 10 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ void RuntimeFilterPort::receive_shared_runtime_filter(int32_t filter_id,
rf_desc->set_shared_runtime_filter(rf);
}
}

RuntimeFilterMerger::RuntimeFilterMerger(ExecEnv* env, const UniqueId& query_id, const TQueryOptions& query_options,
bool is_pipeline)
: _exec_env(env), _query_id(query_id), _query_options(query_options), _is_pipeline(is_pipeline) {}
Expand Down Expand Up @@ -484,15 +485,6 @@ void RuntimeFilterMerger::_send_total_runtime_filter(int rf_version, int32_t fil
pool->clear();
}

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
};

struct RuntimeFilterWorkerEvent {
public:
RuntimeFilterWorkerEvent() = default;
Expand Down Expand Up @@ -520,9 +512,14 @@ static_assert(std::is_move_assignable<RuntimeFilterWorkerEvent>::value);

RuntimeFilterWorker::RuntimeFilterWorker(ExecEnv* env) : _exec_env(env), _thread([this] { execute(); }) {
Thread::set_thread_name(_thread, "runtime_filter");
_metrics = new RuntimeFilterWorkerMetrics();
}

RuntimeFilterWorker::~RuntimeFilterWorker() = default;
RuntimeFilterWorker::~RuntimeFilterWorker() {
if (_metrics) {
delete _metrics;
}
}

void RuntimeFilterWorker::close() {
_queue.shutdown();
Expand All @@ -538,6 +535,7 @@ void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOpti
ev.query_options = query_options;
ev.create_rf_merger_request = params;
ev.is_opened_by_pipeline = is_pipeline;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -546,6 +544,7 @@ void RuntimeFilterWorker::close_query(const TUniqueId& query_id) {
RuntimeFilterWorkerEvent ev;
ev.type = CLOSE_QUERY;
ev.query_id = query_id;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -559,6 +558,8 @@ void RuntimeFilterWorker::send_part_runtime_filter(PTransmitRuntimeFilterParams&
ev.transmit_via_http_min_size = rpc_http_min_size;
ev.transmit_addrs = addrs;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -572,6 +573,8 @@ void RuntimeFilterWorker::send_broadcast_runtime_filter(PTransmitRuntimeFilterPa
ev.transmit_via_http_min_size = rpc_http_min_size;
ev.destinations = destinations;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -592,8 +595,11 @@ void RuntimeFilterWorker::receive_runtime_filter(const PTransmitRuntimeFilterPar
ev.query_id.hi = params.query_id().hi();
ev.query_id.lo = params.query_id().lo();
ev.transmit_rf_request = params;
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

// receive total runtime filter in pipeline engine.
static inline Status receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterParams& params,
const std::shared_ptr<JoinRuntimeFilter>& shared_rf) {
Expand Down Expand Up @@ -868,8 +874,14 @@ void RuntimeFilterWorker::execute() {
if (!_queue.blocking_get(&ev)) {
break;
}

LOG_IF_EVERY_N(INFO, _queue.get_size() > CpuInfo::num_cores() * 10, 10)
<< "runtime filter worker queue may be too large, size: " << _queue.get_size();

_metrics->update_event_nums(ev.type, -1);
switch (ev.type) {
case RECEIVE_TOTAL_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_receive_total_runtime_filter(ev.transmit_rf_request);
break;
}
Expand Down Expand Up @@ -899,6 +911,7 @@ void RuntimeFilterWorker::execute() {
}

case RECEIVE_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
auto it = _mergers.find(ev.query_id);
if (it == _mergers.end()) {
VLOG_QUERY << "receive part rf: rf merger not existed. query_id = " << ev.query_id;
Expand All @@ -912,11 +925,13 @@ void RuntimeFilterWorker::execute() {
}

case SEND_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_deliver_part_runtime_filter(std::move(ev.transmit_addrs), std::move(ev.transmit_rf_request),
ev.transmit_timeout_ms, ev.transmit_via_http_min_size);
break;
}
case SEND_BROADCAST_GRF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_process_send_broadcast_runtime_filter_event(std::move(ev.transmit_rf_request), std::move(ev.destinations),
ev.transmit_timeout_ms, ev.transmit_via_http_min_size);
break;
Expand Down
46 changes: 45 additions & 1 deletion be/src/runtime/runtime_filter_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "util/blocking_queue.hpp"
#include "util/ref_count_closure.h"
#include "util/system_metrics.h"
#include "util/uid_util.h"
namespace starrocks {

Expand Down Expand Up @@ -112,6 +113,35 @@ class RuntimeFilterMerger {
const bool _is_pipeline;
};

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
MAX_COUNT,
};

inline std::string EventTypeToString(EventType type) {
switch (type) {
case RECEIVE_TOTAL_RF:
return "RECEIVE_TOTAL_RF";
case CLOSE_QUERY:
return "CLOSE_QUERY";
case OPEN_QUERY:
return "OPEN_QUERY";
case RECEIVE_PART_RF:
return "RECEIVE_PART_RF";
case SEND_PART_RF:
return "SEND_PART_RF";
case SEND_BROADCAST_GRF:
return "SEND_BROADCAST_GRF";
default:
break;
}
__builtin_unreachable();
}
// RuntimeFilterWorker works in a separated thread, and does following jobs:
// 1. deserialize runtime filters.
// 2. merge runtime filters.
Expand All @@ -122,7 +152,17 @@ class RuntimeFilterMerger {
// - receive total RF and send it to RuntimeFilterPort
// - send partitioned RF(for hash join node)
// - close a query(delete runtime filter merger)
class RuntimeFilterWorkerEvent;
struct RuntimeFilterWorkerEvent;

struct RuntimeFilterWorkerMetrics {
void update_event_nums(EventType event_type, int64_t delta) { event_nums[event_type] += delta; }

void update_rf_bytes(EventType event_type, int64_t delta) { runtime_filter_bytes[event_type] += delta; }

std::array<std::atomic_int64_t, EventType::MAX_COUNT> event_nums{};
std::array<std::atomic_int64_t, EventType::MAX_COUNT> runtime_filter_bytes{};
};

class RuntimeFilterWorker {
public:
RuntimeFilterWorker(ExecEnv* env);
Expand All @@ -141,6 +181,9 @@ class RuntimeFilterWorker {
const std::vector<TRuntimeFilterDestination>& destinations, int timeout_ms,
int64_t rpc_http_min_size);

size_t queue_size() const;
const RuntimeFilterWorkerMetrics* metrics() const { return _metrics; }

private:
void _receive_total_runtime_filter(PTransmitRuntimeFilterParams& params);
void _process_send_broadcast_runtime_filter_event(PTransmitRuntimeFilterParams&& params,
Expand All @@ -163,6 +206,7 @@ class RuntimeFilterWorker {
std::unordered_map<TUniqueId, RuntimeFilterMerger> _mergers;
ExecEnv* _exec_env;
std::thread _thread;
RuntimeFilterWorkerMetrics* _metrics = nullptr;
};

}; // namespace starrocks
42 changes: 42 additions & 0 deletions be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "gutil/strings/split.h" // for string split
#include "gutil/strtoint.h" // for atoi64
#include "jemalloc/jemalloc.h"
#include "runtime/runtime_filter_worker.h"
#include "util/metrics.h"

namespace starrocks {

Expand Down Expand Up @@ -118,6 +120,12 @@ class QueryCacheMetrics {
METRIC_DEFINE_DOUBLE_GAUGE(query_cache_hit_ratio, MetricUnit::PERCENT);
};

class RuntimeFilterMetrics {
public:
METRIC_DEFINE_INT_GAUGE(runtime_filter_events_in_queue, MetricUnit::NOUNIT);
METRIC_DEFINE_INT_GAUGE(runtime_filter_bytes_in_queue, MetricUnit::BYTES);
};

SystemMetrics::SystemMetrics() = default;

SystemMetrics::~SystemMetrics() {
Expand All @@ -135,6 +143,9 @@ SystemMetrics::~SystemMetrics() {
if (_line_ptr != nullptr) {
free(_line_ptr);
}
for (auto& it : _runtime_filter_metrics) {
delete it.second;
}
}

void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string>& disk_devices,
Expand All @@ -150,6 +161,7 @@ void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string
_install_fd_metrics(registry);
_install_snmp_metrics(registry);
_install_query_cache_metrics(registry);
_install_runtime_filter_metrics(registry);
_registry = registry;
}

Expand All @@ -161,6 +173,7 @@ void SystemMetrics::update() {
_update_fd_metrics();
_update_snmp_metrics();
_update_query_cache_metrics();
_update_runtime_filter_metrics();
}

void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) {
Expand Down Expand Up @@ -648,6 +661,35 @@ void SystemMetrics::_install_query_cache_metrics(starrocks::MetricRegistry* regi
registry->register_metric("query_cache_hit_ratio", &_query_cache_metrics->query_cache_hit_ratio);
}

void SystemMetrics::_install_runtime_filter_metrics(starrocks::MetricRegistry* registry) {
for (int i = 0; i < EventType::MAX_COUNT; i++) {
auto* metrics = new RuntimeFilterMetrics();
const auto& type = EventTypeToString((EventType)i);
#define REGISTER_RUNTIME_FILTER_METRIC(name) \
registry->register_metric(#name, MetricLabels().add("type", type), &metrics->name)
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_events_in_queue);
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_bytes_in_queue);
_runtime_filter_metrics.emplace(type, metrics);
}
}

void SystemMetrics::_update_runtime_filter_metrics() {
auto* runtime_filter_worker = ExecEnv::GetInstance()->runtime_filter_worker();
if (UNLIKELY(runtime_filter_worker == nullptr)) {
return;
}
const auto* metrics = runtime_filter_worker->metrics();
for (int i = 0; i < EventType::MAX_COUNT; i++) {
const auto& event_name = EventTypeToString((EventType)i);
auto iter = _runtime_filter_metrics.find(event_name);
if (iter == _runtime_filter_metrics.end()) {
continue;
}
iter->second->runtime_filter_events_in_queue.set_value(metrics->event_nums[i]);
iter->second->runtime_filter_bytes_in_queue.set_value(metrics->runtime_filter_bytes[i]);
}
}

void SystemMetrics::_update_fd_metrics() {
#ifdef BE_TEST
FILE* fp = fopen(k_ut_fd_path, "r");
Expand Down
Loading

0 comments on commit 99ca170

Please sign in to comment.