Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add more profile and metrics to measure runtime bloom filter size #46360

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ void HashJoinBuildMetrics::prepare(RuntimeProfile* runtime_profile) {
runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT);
build_keys_per_bucket = ADD_COUNTER(runtime_profile, "BuildKeysPerBucket%", TUnit::UNIT);
hash_table_memory_usage = ADD_COUNTER(runtime_profile, "HashTableMemoryUsage", TUnit::BYTES);

partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES);
}

HashJoiner::HashJoiner(const HashJoinerParam& param)
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ struct HashJoinBuildMetrics {
RuntimeProfile::Counter* build_keys_per_bucket = nullptr;
RuntimeProfile::Counter* hash_table_memory_usage = nullptr;

RuntimeProfile::Counter* partial_runtime_bloom_filter_bytes = nullptr;

void prepare(RuntimeProfile* runtime_profile);
};

Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

#include "exec/pipeline/hashjoin/hash_join_build_operator.h"

#include <numeric>
#include <utility>

#include "exec/pipeline/query_context.h"
#include "exprs/runtime_filter_bank.h"
#include "runtime/current_thread.h"
#include "runtime/runtime_filter_worker.h"
#include "util/race_detect.h"
Expand Down Expand Up @@ -142,6 +144,16 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

{
size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull,
[](size_t total, RuntimeFilterBuildDescriptor* desc) -> size_t {
auto rf = desc->runtime_filter();
total += (rf == nullptr ? 0 : rf->bf_alloc_size());
return total;
});
COUNTER_UPDATE(_join_builder->build_metrics().partial_runtime_bloom_filter_bytes, total_bf_bytes);
}

// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
Expand Down
19 changes: 15 additions & 4 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <numeric>

#include "column/chunk.h"
#include "column/column_hash.h"
#include "column/column_helper.h"
Expand Down Expand Up @@ -182,6 +184,10 @@ class SimdBlockFilter {
// we can use can_use() to check if this bloom filter can be used
bool can_use() const { return _directory != nullptr; }

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

private:
// The number of bits to set in a tiny Bloom filter block

Expand Down Expand Up @@ -224,10 +230,6 @@ class SimdBlockFilter {
// log2(number of bytes in a bucket):
static constexpr int LOG_BUCKET_BYTE_SIZE = 5;

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

// Common:
// log_num_buckets_ is the log (base 2) of the number of buckets in the directory:
int _log_num_buckets = 0;
Expand Down Expand Up @@ -332,6 +334,15 @@ class JoinRuntimeFilter {
return _hash_partition_bf[0].can_use();
}

size_t bf_alloc_size() const {
if (_hash_partition_bf.empty()) {
return _bf.get_alloc_size();
}
return std::accumulate(
_hash_partition_bf.begin(), _hash_partition_bf.end(), 0ull,
[](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); });
}

// RuntimeFilter version
// if the RuntimeFilter is updated, the version will be updated as well,
// (usually used for TopN Filter)
Expand Down
32 changes: 22 additions & 10 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ void RuntimeFilterPort::receive_shared_runtime_filter(int32_t filter_id,
rf_desc->set_shared_runtime_filter(rf);
}
}

RuntimeFilterMerger::RuntimeFilterMerger(ExecEnv* env, const UniqueId& query_id, const TQueryOptions& query_options,
bool is_pipeline)
: _exec_env(env), _query_id(query_id), _query_options(query_options), _is_pipeline(is_pipeline) {}
Expand Down Expand Up @@ -496,15 +497,6 @@ void RuntimeFilterMerger::_send_total_runtime_filter(int rf_version, int32_t fil
pool->clear();
}

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
};

struct RuntimeFilterWorkerEvent {
public:
RuntimeFilterWorkerEvent() = default;
Expand Down Expand Up @@ -532,9 +524,14 @@ static_assert(std::is_move_assignable<RuntimeFilterWorkerEvent>::value);

RuntimeFilterWorker::RuntimeFilterWorker(ExecEnv* env) : _exec_env(env), _thread([this] { execute(); }) {
Thread::set_thread_name(_thread, "runtime_filter");
_metrics = new RuntimeFilterWorkerMetrics();
}

RuntimeFilterWorker::~RuntimeFilterWorker() = default;
RuntimeFilterWorker::~RuntimeFilterWorker() {
if (_metrics) {
delete _metrics;
}
}

void RuntimeFilterWorker::close() {
_queue.shutdown();
Expand All @@ -550,6 +547,7 @@ void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOpti
ev.query_options = query_options;
ev.create_rf_merger_request = params;
ev.is_opened_by_pipeline = is_pipeline;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -558,6 +556,7 @@ void RuntimeFilterWorker::close_query(const TUniqueId& query_id) {
RuntimeFilterWorkerEvent ev;
ev.type = CLOSE_QUERY;
ev.query_id = query_id;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -571,6 +570,8 @@ void RuntimeFilterWorker::send_part_runtime_filter(PTransmitRuntimeFilterParams&
ev.transmit_via_http_min_size = rpc_http_min_size;
ev.transmit_addrs = addrs;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -584,6 +585,8 @@ void RuntimeFilterWorker::send_broadcast_runtime_filter(PTransmitRuntimeFilterPa
ev.transmit_via_http_min_size = rpc_http_min_size;
ev.destinations = destinations;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -604,8 +607,11 @@ void RuntimeFilterWorker::receive_runtime_filter(const PTransmitRuntimeFilterPar
ev.query_id.hi = params.query_id().hi();
ev.query_id.lo = params.query_id().lo();
ev.transmit_rf_request = params;
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

// receive total runtime filter in pipeline engine.
static inline void receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterParams& params,
const std::shared_ptr<JoinRuntimeFilter>& shared_rf) {
Expand Down Expand Up @@ -876,6 +882,7 @@ void RuntimeFilterWorker::_deliver_part_runtime_filter(std::vector<TNetworkAddre
void RuntimeFilterWorker::execute() {
LOG(INFO) << "RuntimeFilterWorker start working.";
for (;;) {
// @TODO pending queue size, total rf size
RuntimeFilterWorkerEvent ev;
if (!_queue.blocking_get(&ev)) {
break;
Expand All @@ -884,8 +891,10 @@ void RuntimeFilterWorker::execute() {
LOG_IF_EVERY_N(INFO, _queue.get_size() > CpuInfo::num_cores() * 10, 10)
<< "runtime filter worker queue may be too large, size: " << _queue.get_size();

_metrics->update_event_nums(ev.type, -1);
switch (ev.type) {
case RECEIVE_TOTAL_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_receive_total_runtime_filter(ev.transmit_rf_request);
break;
}
Expand Down Expand Up @@ -915,6 +924,7 @@ void RuntimeFilterWorker::execute() {
}

case RECEIVE_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
auto it = _mergers.find(ev.query_id);
if (it == _mergers.end()) {
VLOG_QUERY << "receive part rf: rf merger not existed. query_id = " << ev.query_id;
Expand All @@ -928,11 +938,13 @@ void RuntimeFilterWorker::execute() {
}

case SEND_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_deliver_part_runtime_filter(std::move(ev.transmit_addrs), std::move(ev.transmit_rf_request),
ev.transmit_timeout_ms, ev.transmit_via_http_min_size);
break;
}
case SEND_BROADCAST_GRF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_process_send_broadcast_runtime_filter_event(std::move(ev.transmit_rf_request), std::move(ev.destinations),
ev.transmit_timeout_ms, ev.transmit_via_http_min_size);
break;
silverbullet233 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
50 changes: 50 additions & 0 deletions be/src/runtime/runtime_filter_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "util/blocking_queue.hpp"
#include "util/ref_count_closure.h"
#include "util/system_metrics.h"
#include "util/uid_util.h"
namespace starrocks {

Expand Down Expand Up @@ -113,6 +114,35 @@ class RuntimeFilterMerger {
const bool _is_pipeline;
};

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
MAX_COUNT,
};

inline std::string EventTypeToString(EventType type) {
switch (type) {
case RECEIVE_TOTAL_RF:
return "RECEIVE_TOTAL_RF";
case CLOSE_QUERY:
return "CLOSE_QUERY";
case OPEN_QUERY:
return "OPEN_QUERY";
case RECEIVE_PART_RF:
return "RECEIVE_PART_RF";
case SEND_PART_RF:
return "SEND_PART_RF";
case SEND_BROADCAST_GRF:
return "SEND_BROADCAST_GRF";
default:
break;
}
__builtin_unreachable();
}
// RuntimeFilterWorker works in a separated thread, and does following jobs:
// 1. deserialize runtime filters.
// 2. merge runtime filters.
Expand All @@ -124,6 +154,24 @@ class RuntimeFilterMerger {
// - send partitioned RF(for hash join node)
// - close a query(delete runtime filter merger)
struct RuntimeFilterWorkerEvent;

struct RuntimeFilterWorkerMetrics {
void update_event_nums(EventType event_type, int64_t delta) {
LOG(INFO) << "update event num, type: " << EventTypeToString(event_type)
<< ", old: " << runtime_filter_bytes[event_type] << ", delta:" << delta;
event_nums[event_type] += delta;
}

void update_rf_bytes(EventType event_type, int64_t delta) {
LOG(INFO) << "update rf bytes, type: " << EventTypeToString(event_type)
<< ", old: " << runtime_filter_bytes[event_type] << ", delta:" << delta;
runtime_filter_bytes[event_type] += delta;
}

std::array<std::atomic_int64_t, EventType::MAX_COUNT> event_nums{};
std::array<std::atomic_int64_t, EventType::MAX_COUNT> runtime_filter_bytes{};
};

class RuntimeFilterWorker {
public:
RuntimeFilterWorker(ExecEnv* env);
Expand All @@ -143,6 +191,7 @@ class RuntimeFilterWorker {
int64_t rpc_http_min_size);

size_t queue_size() const;
const RuntimeFilterWorkerMetrics* metrics() const { return _metrics; }

private:
void _receive_total_runtime_filter(PTransmitRuntimeFilterParams& params);
Expand All @@ -166,6 +215,7 @@ class RuntimeFilterWorker {
std::unordered_map<TUniqueId, RuntimeFilterMerger> _mergers;
ExecEnv* _exec_env;
std::thread _thread;
RuntimeFilterWorkerMetrics* _metrics = nullptr;
};

}; // namespace starrocks
silverbullet233 marked this conversation as resolved.
Show resolved Hide resolved
42 changes: 42 additions & 0 deletions be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "gutil/strings/split.h" // for string split
#include "gutil/strtoint.h" // for atoi64
#include "jemalloc/jemalloc.h"
#include "runtime/runtime_filter_worker.h"
#include "util/metrics.h"

namespace starrocks {

Expand Down Expand Up @@ -118,6 +120,12 @@ class QueryCacheMetrics {
METRIC_DEFINE_DOUBLE_GAUGE(query_cache_hit_ratio, MetricUnit::PERCENT);
};

class RuntimeFilterMetrics {
public:
METRIC_DEFINE_INT_GAUGE(runtime_filter_events_in_queue, MetricUnit::NOUNIT);
METRIC_DEFINE_INT_GAUGE(runtime_filter_bytes_in_queue, MetricUnit::BYTES);
};

SystemMetrics::SystemMetrics() = default;

SystemMetrics::~SystemMetrics() {
Expand All @@ -135,6 +143,9 @@ SystemMetrics::~SystemMetrics() {
if (_line_ptr != nullptr) {
free(_line_ptr);
}
for (auto& it : _runtime_filter_metrics) {
delete it.second;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may consider using std::unique_ptr to manage the pointers instead of using the raw one. so the resource deallocation will be as easy as _runtime_filter_metrics.clear()

}
}

void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string>& disk_devices,
Expand All @@ -150,6 +161,7 @@ void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string
_install_fd_metrics(registry);
_install_snmp_metrics(registry);
_install_query_cache_metrics(registry);
_install_runtime_filter_metrics(registry);
_registry = registry;
}

Expand All @@ -161,6 +173,7 @@ void SystemMetrics::update() {
_update_fd_metrics();
_update_snmp_metrics();
_update_query_cache_metrics();
_update_runtime_filter_metrics();
}

void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) {
Expand Down Expand Up @@ -650,6 +663,35 @@ void SystemMetrics::_install_query_cache_metrics(starrocks::MetricRegistry* regi
registry->register_metric("query_cache_hit_ratio", &_query_cache_metrics->query_cache_hit_ratio);
}

void SystemMetrics::_install_runtime_filter_metrics(starrocks::MetricRegistry* registry) {
for (int i = 0; i < EventType::MAX_COUNT; i++) {
auto* metrics = new RuntimeFilterMetrics();
const auto& type = EventTypeToString((EventType)i);
#define REGISTER_RUNTIME_FILTER_METRIC(name) \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better move this macro out of the function body, favor for code readability.

registry->register_metric(#name, MetricLabels().add("type", type), &metrics->name)
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_events_in_queue);
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_bytes_in_queue);
_runtime_filter_metrics.emplace(type, metrics);
}
}

void SystemMetrics::_update_runtime_filter_metrics() {
auto* runtime_filter_worker = ExecEnv::GetInstance()->runtime_filter_worker();
if (UNLIKELY(runtime_filter_worker == nullptr)) {
return;
}
const auto* metrics = runtime_filter_worker->metrics();
for (int i = 0; i < EventType::MAX_COUNT; i++) {
const auto& event_name = EventTypeToString((EventType)i);
auto iter = _runtime_filter_metrics.find(event_name);
if (iter == _runtime_filter_metrics.end()) {
continue;
}
iter->second->runtime_filter_events_in_queue.set_value(metrics->event_nums[i]);
iter->second->runtime_filter_bytes_in_queue.set_value(metrics->runtime_filter_bytes[i]);
}
}

void SystemMetrics::_update_fd_metrics() {
#ifdef BE_TEST
FILE* fp = fopen(k_ut_fd_path, "r");
silverbullet233 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading
Loading