Skip to content

Commit

Permalink
[Enhancement] add more profile and metrics to measure runtime bloom f…
Browse files Browse the repository at this point in the history
…ilter size (StarRocks#46360)

Signed-off-by: silverbullet233 <[email protected]>
  • Loading branch information
silverbullet233 committed May 30, 2024
1 parent b9e144b commit 5157bc4
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 15 deletions.
9 changes: 9 additions & 0 deletions be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

{
size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull,
[](size_t total, RuntimeBloomFilter* desc) -> size_t {
auto rf = desc->runtime_filter();
total += (rf == nullptr ? 0 : rf->bf_alloc_size());
return total;
});
_join_builder->update_partial_runtime_bloom_filter_bytes(total_bf_bytes);
}
// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/vectorized/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Status HashJoiner::prepare_builder(RuntimeState* state, RuntimeProfile* runtime_
_build_conjunct_evaluate_timer = ADD_TIMER(runtime_profile, "BuildConjunctEvaluateTime");
_build_buckets_counter = ADD_COUNTER_SKIP_MERGE(runtime_profile, "BuildBuckets", TUnit::UNIT);
_runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT);
_partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES);

HashTableParam param;
_init_hash_table_param(&param);
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/vectorized/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ class HashJoiner final : public pipeline::ContextWithDependency {
void set_prober_finished();
Columns string_key_columns() { return _string_key_columns; }

void update_partial_runtime_bloom_filter_bytes(int64_t total_bytes) {
COUNTER_UPDATE(_partial_runtime_bloom_filter_bytes, total_bytes);
}

private:
static bool _has_null(const ColumnPtr& column);

Expand Down Expand Up @@ -384,6 +388,7 @@ class HashJoiner final : public pipeline::ContextWithDependency {
RuntimeProfile::Counter* _output_build_column_timer = nullptr;
RuntimeProfile::Counter* _build_buckets_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_num = nullptr;
RuntimeProfile::Counter* _partial_runtime_bloom_filter_bytes = nullptr;

// Profile for hash join prober.
RuntimeProfile::Counter* _search_ht_timer = nullptr;
Expand Down
17 changes: 13 additions & 4 deletions be/src/exprs/vectorized/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class SimdBlockFilter {
// we can use can_use() to check if this bloom filter can be used
bool can_use() const { return _directory != nullptr; }

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

private:
// The number of bits to set in a tiny Bloom filter block

Expand All @@ -139,10 +143,6 @@ class SimdBlockFilter {
// log2(number of bytes in a bucket):
static constexpr int LOG_BUCKET_BYTE_SIZE = 5;

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

// Common:
// log_num_buckets_ is the log (base 2) of the number of buckets in the directory:
int _log_num_buckets = 0;
Expand Down Expand Up @@ -243,6 +243,15 @@ class JoinRuntimeFilter {
return _hash_partition_bf[0].can_use();
}

size_t bf_alloc_size() const {
if (_hash_partition_bf.empty()) {
return _bf.get_alloc_size();
}
return std::accumulate(
_hash_partition_bf.begin(), _hash_partition_bf.end(), 0ull,
[](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); });
}

virtual size_t max_serialized_size() const;
virtual size_t serialize(uint8_t* data) const;
virtual size_t deserialize(const uint8_t* data);
Expand Down
32 changes: 23 additions & 9 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ void RuntimeFilterPort::receive_shared_runtime_filter(int32_t filter_id,
rf_desc->set_shared_runtime_filter(rf);
}
}

RuntimeFilterMerger::RuntimeFilterMerger(ExecEnv* env, const UniqueId& query_id, const TQueryOptions& query_options,
bool is_pipeline)
: _exec_env(env), _query_id(query_id), _query_options(query_options), _is_pipeline(is_pipeline) {}
Expand Down Expand Up @@ -457,15 +458,6 @@ void RuntimeFilterMerger::_send_total_runtime_filter(int32_t filter_id) {
pool->clear();
}

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
};

struct RuntimeFilterWorkerEvent {
public:
RuntimeFilterWorkerEvent() = default;
Expand All @@ -492,11 +484,15 @@ static_assert(std::is_move_assignable<RuntimeFilterWorkerEvent>::value);

RuntimeFilterWorker::RuntimeFilterWorker(ExecEnv* env) : _exec_env(env), _thread([this] { execute(); }) {
Thread::set_thread_name(_thread, "runtime_filter");
_metrics = new RuntimeFilterWorkerMetrics();
}

RuntimeFilterWorker::~RuntimeFilterWorker() {
_queue.shutdown();
_thread.join();
if (_metrics) {
delete _metrics;
}
}

void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOptions& query_options,
Expand All @@ -508,6 +504,7 @@ void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOpti
ev.query_options = query_options;
ev.create_rf_merger_request = params;
ev.is_opened_by_pipeline = is_pipeline;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -516,6 +513,7 @@ void RuntimeFilterWorker::close_query(const TUniqueId& query_id) {
RuntimeFilterWorkerEvent ev;
ev.type = CLOSE_QUERY;
ev.query_id = query_id;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -527,6 +525,8 @@ void RuntimeFilterWorker::send_part_runtime_filter(PTransmitRuntimeFilterParams&
ev.transmit_timeout_ms = timeout_ms;
ev.transmit_addrs = addrs;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -539,6 +539,8 @@ void RuntimeFilterWorker::send_broadcast_runtime_filter(PTransmitRuntimeFilterPa
ev.transmit_timeout_ms = timeout_ms;
ev.destinations = destinations;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -559,8 +561,11 @@ void RuntimeFilterWorker::receive_runtime_filter(const PTransmitRuntimeFilterPar
ev.query_id.hi = params.query_id().hi();
ev.query_id.lo = params.query_id().lo();
ev.transmit_rf_request = params;
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

// receive total runtime filter in pipeline engine.
static inline Status receive_total_runtime_filter_pipeline(
PTransmitRuntimeFilterParams& params, const std::shared_ptr<vectorized::JoinRuntimeFilter>& shared_rf) {
Expand Down Expand Up @@ -851,8 +856,14 @@ void RuntimeFilterWorker::execute() {
if (!_queue.blocking_get(&ev)) {
break;
}

LOG_IF_EVERY_N(INFO, _queue.get_size() > CpuInfo::num_cores() * 10, 10)
<< "runtime filter worker queue may be too large, size: " << _queue.get_size();

_metrics->update_event_nums(ev.type, -1);
switch (ev.type) {
case RECEIVE_TOTAL_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_receive_total_runtime_filter(ev.transmit_rf_request);
break;
}
Expand Down Expand Up @@ -882,6 +893,7 @@ void RuntimeFilterWorker::execute() {
}

case RECEIVE_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
auto it = _mergers.find(ev.query_id);
if (it == _mergers.end()) {
VLOG_QUERY << "receive part rf: rf merger not existed. query_id = " << ev.query_id;
Expand All @@ -895,11 +907,13 @@ void RuntimeFilterWorker::execute() {
}

case SEND_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_deliver_part_runtime_filter(std::move(ev.transmit_addrs), std::move(ev.transmit_rf_request),
ev.transmit_timeout_ms);
break;
}
case SEND_BROADCAST_GRF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_process_send_broadcast_runtime_filter_event(std::move(ev.transmit_rf_request), std::move(ev.destinations),
ev.transmit_timeout_ms);
break;
Expand Down
46 changes: 45 additions & 1 deletion be/src/runtime/runtime_filter_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "util/blocking_queue.hpp"
#include "util/ref_count_closure.h"
#include "util/system_metrics.h"
#include "util/uid_util.h"
namespace starrocks {

Expand Down Expand Up @@ -103,6 +104,35 @@ class RuntimeFilterMerger {
const bool _is_pipeline;
};

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
MAX_COUNT,
};

inline std::string EventTypeToString(EventType type) {
switch (type) {
case RECEIVE_TOTAL_RF:
return "RECEIVE_TOTAL_RF";
case CLOSE_QUERY:
return "CLOSE_QUERY";
case OPEN_QUERY:
return "OPEN_QUERY";
case RECEIVE_PART_RF:
return "RECEIVE_PART_RF";
case SEND_PART_RF:
return "SEND_PART_RF";
case SEND_BROADCAST_GRF:
return "SEND_BROADCAST_GRF";
default:
break;
}
__builtin_unreachable();
}
// RuntimeFilterWorker works in a separated thread, and does following jobs:
// 1. deserialize runtime filters.
// 2. merge runtime filters.
Expand All @@ -113,7 +143,17 @@ class RuntimeFilterMerger {
// - receive total RF and send it to RuntimeFilterPort
// - send partitioned RF(for hash join node)
// - close a query(delete runtime filter merger)
class RuntimeFilterWorkerEvent;
struct RuntimeFilterWorkerEvent;

struct RuntimeFilterWorkerMetrics {
void update_event_nums(EventType event_type, int64_t delta) { event_nums[event_type] += delta; }

void update_rf_bytes(EventType event_type, int64_t delta) { runtime_filter_bytes[event_type] += delta; }

std::array<std::atomic_int64_t, EventType::MAX_COUNT> event_nums{};
std::array<std::atomic_int64_t, EventType::MAX_COUNT> runtime_filter_bytes{};
};

class RuntimeFilterWorker {
public:
RuntimeFilterWorker(ExecEnv* env);
Expand All @@ -129,6 +169,9 @@ class RuntimeFilterWorker {
void send_broadcast_runtime_filter(PTransmitRuntimeFilterParams&& params,
const std::vector<TRuntimeFilterDestination>& destinations, int timeout_ms);

size_t queue_size() const;
const RuntimeFilterWorkerMetrics* metrics() const { return _metrics; }

private:
void _receive_total_runtime_filter(PTransmitRuntimeFilterParams& params);
void _process_send_broadcast_runtime_filter_event(PTransmitRuntimeFilterParams&& params,
Expand All @@ -149,6 +192,7 @@ class RuntimeFilterWorker {
std::unordered_map<TUniqueId, RuntimeFilterMerger> _mergers;
ExecEnv* _exec_env;
std::thread _thread;
RuntimeFilterWorkerMetrics* _metrics = nullptr;
};

}; // namespace starrocks
43 changes: 42 additions & 1 deletion be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
#include "column/column_pool.h"
#include "gutil/strings/split.h" // for string split
#include "gutil/strtoint.h" // for atoi64

#include "runtime/runtime_filter_worker.h"
#include "util/metrics.h"
namespace starrocks {

const char* const SystemMetrics::_s_hook_name = "system_metrics";
Expand Down Expand Up @@ -108,6 +109,12 @@ class QueryCacheMetrics {
METRIC_DEFINE_DOUBLE_GAUGE(query_cache_hit_ratio, MetricUnit::PERCENT);
};

class RuntimeFilterMetrics {
public:
METRIC_DEFINE_INT_GAUGE(runtime_filter_events_in_queue, MetricUnit::NOUNIT);
METRIC_DEFINE_INT_GAUGE(runtime_filter_bytes_in_queue, MetricUnit::BYTES);
};

SystemMetrics::SystemMetrics() = default;

SystemMetrics::~SystemMetrics() {
Expand All @@ -125,6 +132,9 @@ SystemMetrics::~SystemMetrics() {
if (_line_ptr != nullptr) {
free(_line_ptr);
}
for (auto& it : _runtime_filter_metrics) {
delete it.second;
}
}

void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string>& disk_devices,
Expand All @@ -140,6 +150,7 @@ void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string
_install_fd_metrics(registry);
_install_snmp_metrics(registry);
_install_query_cache_metrics(registry);
_install_runtime_filter_metrics(registry);
_registry = registry;
}

Expand All @@ -151,6 +162,7 @@ void SystemMetrics::update() {
_update_fd_metrics();
_update_snmp_metrics();
_update_query_cache_metrics();
_update_runtime_filter_metrics();
}

void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) {
Expand Down Expand Up @@ -668,6 +680,35 @@ void SystemMetrics::_install_query_cache_metrics(starrocks::MetricRegistry* regi
registry->register_metric("query_cache_hit_ratio", &_query_cache_metrics->query_cache_hit_ratio);
}

void SystemMetrics::_install_runtime_filter_metrics(starrocks::MetricRegistry* registry) {
for (int i = 0; i < EventType::MAX_COUNT; i++) {
auto* metrics = new RuntimeFilterMetrics();
const auto& type = EventTypeToString((EventType)i);
#define REGISTER_RUNTIME_FILTER_METRIC(name) \
registry->register_metric(#name, MetricLabels().add("type", type), &metrics->name)
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_events_in_queue);
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_bytes_in_queue);
_runtime_filter_metrics.emplace(type, metrics);
}
}

void SystemMetrics::_update_runtime_filter_metrics() {
auto* runtime_filter_worker = ExecEnv::GetInstance()->runtime_filter_worker();
if (UNLIKELY(runtime_filter_worker == nullptr)) {
return;
}
const auto* metrics = runtime_filter_worker->metrics();
for (int i = 0; i < EventType::MAX_COUNT; i++) {
const auto& event_name = EventTypeToString((EventType)i);
auto iter = _runtime_filter_metrics.find(event_name);
if (iter == _runtime_filter_metrics.end()) {
continue;
}
iter->second->runtime_filter_events_in_queue.set_value(metrics->event_nums[i]);
iter->second->runtime_filter_bytes_in_queue.set_value(metrics->runtime_filter_bytes[i]);
}
}

void SystemMetrics::_update_fd_metrics() {
#ifdef BE_TEST
FILE* fp = fopen(k_ut_fd_path, "r");
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/system_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class NetMetrics;
class FileDescriptorMetrics;
class SnmpMetrics;
class QueryCacheMetrics;
class RuntimeFilterMetrics;

class MemoryMetrics {
public:
Expand Down Expand Up @@ -142,6 +143,10 @@ class SystemMetrics {

void _update_query_cache_metrics();

void _install_runtime_filter_metrics(MetricRegistry* registry);

void _update_runtime_filter_metrics();

private:
static const char* const _s_hook_name;

Expand All @@ -151,6 +156,7 @@ class SystemMetrics {
std::map<std::string, NetMetrics*> _net_metrics;
std::unique_ptr<FileDescriptorMetrics> _fd_metrics;
std::unique_ptr<QueryCacheMetrics> _query_cache_metrics;
std::map<std::string, RuntimeFilterMetrics*> _runtime_filter_metrics;
int _proc_net_dev_version = 0;
std::unique_ptr<SnmpMetrics> _snmp_metrics;

Expand Down

0 comments on commit 5157bc4

Please sign in to comment.