Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add more profile and metrics to measure runtime bloom filter size(backport #46360) #46453

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

{
size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull,
[](size_t total, RuntimeBloomFilter* desc) -> size_t {
auto rf = desc->runtime_filter();
total += (rf == nullptr ? 0 : rf->bf_alloc_size());
return total;
});
_join_builder->update_partial_runtime_bloom_filter_bytes(total_bf_bytes);
}
// publish runtime bloom-filters
state->runtime_filter_port()->publish_runtime_filters(bloom_filters);
// move runtime filters into RuntimeFilterHub.
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/vectorized/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Status HashJoiner::prepare_builder(RuntimeState* state, RuntimeProfile* runtime_
_build_conjunct_evaluate_timer = ADD_TIMER(runtime_profile, "BuildConjunctEvaluateTime");
_build_buckets_counter = ADD_COUNTER_SKIP_MERGE(runtime_profile, "BuildBuckets", TUnit::UNIT);
_runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT);
_partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES);

HashTableParam param;
_init_hash_table_param(&param);
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/vectorized/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ class HashJoiner final : public pipeline::ContextWithDependency {
void set_prober_finished();
Columns string_key_columns() { return _string_key_columns; }

void update_partial_runtime_bloom_filter_bytes(int64_t total_bytes) {
COUNTER_UPDATE(_partial_runtime_bloom_filter_bytes, total_bytes);
}

private:
static bool _has_null(const ColumnPtr& column);

Expand Down Expand Up @@ -384,6 +388,7 @@ class HashJoiner final : public pipeline::ContextWithDependency {
RuntimeProfile::Counter* _output_build_column_timer = nullptr;
RuntimeProfile::Counter* _build_buckets_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_num = nullptr;
RuntimeProfile::Counter* _partial_runtime_bloom_filter_bytes = nullptr;

// Profile for hash join prober.
RuntimeProfile::Counter* _search_ht_timer = nullptr;
Expand Down
17 changes: 13 additions & 4 deletions be/src/exprs/vectorized/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class SimdBlockFilter {
// we can use can_use() to check if this bloom filter can be used
bool can_use() const { return _directory != nullptr; }

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

private:
// The number of bits to set in a tiny Bloom filter block

Expand All @@ -139,10 +143,6 @@ class SimdBlockFilter {
// log2(number of bytes in a bucket):
static constexpr int LOG_BUCKET_BYTE_SIZE = 5;

size_t get_alloc_size() const {
return _log_num_buckets == 0 ? 0 : (1ull << (_log_num_buckets + LOG_BUCKET_BYTE_SIZE));
}

// Common:
// log_num_buckets_ is the log (base 2) of the number of buckets in the directory:
int _log_num_buckets = 0;
Expand Down Expand Up @@ -243,6 +243,15 @@ class JoinRuntimeFilter {
return _hash_partition_bf[0].can_use();
}

size_t bf_alloc_size() const {
if (_hash_partition_bf.empty()) {
return _bf.get_alloc_size();
}
return std::accumulate(
_hash_partition_bf.begin(), _hash_partition_bf.end(), 0ull,
[](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); });
}

virtual size_t max_serialized_size() const;
virtual size_t serialize(uint8_t* data) const;
virtual size_t deserialize(const uint8_t* data);
Expand Down
32 changes: 23 additions & 9 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ void RuntimeFilterPort::receive_shared_runtime_filter(int32_t filter_id,
rf_desc->set_shared_runtime_filter(rf);
}
}

RuntimeFilterMerger::RuntimeFilterMerger(ExecEnv* env, const UniqueId& query_id, const TQueryOptions& query_options,
bool is_pipeline)
: _exec_env(env), _query_id(query_id), _query_options(query_options), _is_pipeline(is_pipeline) {}
Expand Down Expand Up @@ -457,15 +458,6 @@ void RuntimeFilterMerger::_send_total_runtime_filter(int32_t filter_id) {
pool->clear();
}

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
};

struct RuntimeFilterWorkerEvent {
public:
RuntimeFilterWorkerEvent() = default;
Expand All @@ -492,11 +484,15 @@ static_assert(std::is_move_assignable<RuntimeFilterWorkerEvent>::value);

RuntimeFilterWorker::RuntimeFilterWorker(ExecEnv* env) : _exec_env(env), _thread([this] { execute(); }) {
Thread::set_thread_name(_thread, "runtime_filter");
_metrics = new RuntimeFilterWorkerMetrics();
}

RuntimeFilterWorker::~RuntimeFilterWorker() {
_queue.shutdown();
_thread.join();
if (_metrics) {
delete _metrics;
}
}

void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOptions& query_options,
Expand All @@ -508,6 +504,7 @@ void RuntimeFilterWorker::open_query(const TUniqueId& query_id, const TQueryOpti
ev.query_options = query_options;
ev.create_rf_merger_request = params;
ev.is_opened_by_pipeline = is_pipeline;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -516,6 +513,7 @@ void RuntimeFilterWorker::close_query(const TUniqueId& query_id) {
RuntimeFilterWorkerEvent ev;
ev.type = CLOSE_QUERY;
ev.query_id = query_id;
_metrics->update_event_nums(ev.type, 1);
_queue.put(std::move(ev));
}

Expand All @@ -527,6 +525,8 @@ void RuntimeFilterWorker::send_part_runtime_filter(PTransmitRuntimeFilterParams&
ev.transmit_timeout_ms = timeout_ms;
ev.transmit_addrs = addrs;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -539,6 +539,8 @@ void RuntimeFilterWorker::send_broadcast_runtime_filter(PTransmitRuntimeFilterPa
ev.transmit_timeout_ms = timeout_ms;
ev.destinations = destinations;
ev.transmit_rf_request = std::move(params);
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

Expand All @@ -559,8 +561,11 @@ void RuntimeFilterWorker::receive_runtime_filter(const PTransmitRuntimeFilterPar
ev.query_id.hi = params.query_id().hi();
ev.query_id.lo = params.query_id().lo();
ev.transmit_rf_request = params;
_metrics->update_event_nums(ev.type, 1);
_metrics->update_rf_bytes(ev.type, ev.transmit_rf_request.data().size());
_queue.put(std::move(ev));
}

// receive total runtime filter in pipeline engine.
static inline Status receive_total_runtime_filter_pipeline(
PTransmitRuntimeFilterParams& params, const std::shared_ptr<vectorized::JoinRuntimeFilter>& shared_rf) {
Expand Down Expand Up @@ -851,8 +856,14 @@ void RuntimeFilterWorker::execute() {
if (!_queue.blocking_get(&ev)) {
break;
}

LOG_IF_EVERY_N(INFO, _queue.get_size() > CpuInfo::num_cores() * 10, 10)
<< "runtime filter worker queue may be too large, size: " << _queue.get_size();

_metrics->update_event_nums(ev.type, -1);
switch (ev.type) {
case RECEIVE_TOTAL_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_receive_total_runtime_filter(ev.transmit_rf_request);
break;
}
Expand Down Expand Up @@ -882,6 +893,7 @@ void RuntimeFilterWorker::execute() {
}

case RECEIVE_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
auto it = _mergers.find(ev.query_id);
if (it == _mergers.end()) {
VLOG_QUERY << "receive part rf: rf merger not existed. query_id = " << ev.query_id;
Expand All @@ -895,11 +907,13 @@ void RuntimeFilterWorker::execute() {
}

case SEND_PART_RF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_deliver_part_runtime_filter(std::move(ev.transmit_addrs), std::move(ev.transmit_rf_request),
ev.transmit_timeout_ms);
break;
}
case SEND_BROADCAST_GRF: {
_metrics->update_rf_bytes(ev.type, -ev.transmit_rf_request.data().size());
_process_send_broadcast_runtime_filter_event(std::move(ev.transmit_rf_request), std::move(ev.destinations),
ev.transmit_timeout_ms);
break;
Expand Down
46 changes: 45 additions & 1 deletion be/src/runtime/runtime_filter_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "util/blocking_queue.hpp"
#include "util/ref_count_closure.h"
#include "util/system_metrics.h"
#include "util/uid_util.h"
namespace starrocks {

Expand Down Expand Up @@ -103,6 +104,35 @@ class RuntimeFilterMerger {
const bool _is_pipeline;
};

enum EventType {
RECEIVE_TOTAL_RF = 0,
CLOSE_QUERY = 1,
OPEN_QUERY = 2,
RECEIVE_PART_RF = 3,
SEND_PART_RF = 4,
SEND_BROADCAST_GRF = 5,
MAX_COUNT,
};

inline std::string EventTypeToString(EventType type) {
switch (type) {
case RECEIVE_TOTAL_RF:
return "RECEIVE_TOTAL_RF";
case CLOSE_QUERY:
return "CLOSE_QUERY";
case OPEN_QUERY:
return "OPEN_QUERY";
case RECEIVE_PART_RF:
return "RECEIVE_PART_RF";
case SEND_PART_RF:
return "SEND_PART_RF";
case SEND_BROADCAST_GRF:
return "SEND_BROADCAST_GRF";
default:
break;
}
__builtin_unreachable();
}
// RuntimeFilterWorker works in a separated thread, and does following jobs:
// 1. deserialize runtime filters.
// 2. merge runtime filters.
Expand All @@ -113,7 +143,17 @@ class RuntimeFilterMerger {
// - receive total RF and send it to RuntimeFilterPort
// - send partitioned RF(for hash join node)
// - close a query(delete runtime filter merger)
class RuntimeFilterWorkerEvent;
struct RuntimeFilterWorkerEvent;

struct RuntimeFilterWorkerMetrics {
void update_event_nums(EventType event_type, int64_t delta) { event_nums[event_type] += delta; }

void update_rf_bytes(EventType event_type, int64_t delta) { runtime_filter_bytes[event_type] += delta; }

std::array<std::atomic_int64_t, EventType::MAX_COUNT> event_nums{};
std::array<std::atomic_int64_t, EventType::MAX_COUNT> runtime_filter_bytes{};
};

class RuntimeFilterWorker {
public:
RuntimeFilterWorker(ExecEnv* env);
Expand All @@ -129,6 +169,9 @@ class RuntimeFilterWorker {
void send_broadcast_runtime_filter(PTransmitRuntimeFilterParams&& params,
const std::vector<TRuntimeFilterDestination>& destinations, int timeout_ms);

size_t queue_size() const;
const RuntimeFilterWorkerMetrics* metrics() const { return _metrics; }

private:
void _receive_total_runtime_filter(PTransmitRuntimeFilterParams& params);
void _process_send_broadcast_runtime_filter_event(PTransmitRuntimeFilterParams&& params,
Expand All @@ -149,6 +192,7 @@ class RuntimeFilterWorker {
std::unordered_map<TUniqueId, RuntimeFilterMerger> _mergers;
ExecEnv* _exec_env;
std::thread _thread;
RuntimeFilterWorkerMetrics* _metrics = nullptr;
};

}; // namespace starrocks
43 changes: 42 additions & 1 deletion be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
#include "column/column_pool.h"
#include "gutil/strings/split.h" // for string split
#include "gutil/strtoint.h" // for atoi64

#include "runtime/runtime_filter_worker.h"
#include "util/metrics.h"
namespace starrocks {

const char* const SystemMetrics::_s_hook_name = "system_metrics";
Expand Down Expand Up @@ -108,6 +109,12 @@ class QueryCacheMetrics {
METRIC_DEFINE_DOUBLE_GAUGE(query_cache_hit_ratio, MetricUnit::PERCENT);
};

class RuntimeFilterMetrics {
public:
METRIC_DEFINE_INT_GAUGE(runtime_filter_events_in_queue, MetricUnit::NOUNIT);
METRIC_DEFINE_INT_GAUGE(runtime_filter_bytes_in_queue, MetricUnit::BYTES);
};

SystemMetrics::SystemMetrics() = default;

SystemMetrics::~SystemMetrics() {
Expand All @@ -125,6 +132,9 @@ SystemMetrics::~SystemMetrics() {
if (_line_ptr != nullptr) {
free(_line_ptr);
}
for (auto& it : _runtime_filter_metrics) {
delete it.second;
}
}

void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string>& disk_devices,
Expand All @@ -140,6 +150,7 @@ void SystemMetrics::install(MetricRegistry* registry, const std::set<std::string
_install_fd_metrics(registry);
_install_snmp_metrics(registry);
_install_query_cache_metrics(registry);
_install_runtime_filter_metrics(registry);
_registry = registry;
}

Expand All @@ -151,6 +162,7 @@ void SystemMetrics::update() {
_update_fd_metrics();
_update_snmp_metrics();
_update_query_cache_metrics();
_update_runtime_filter_metrics();
}

void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) {
Expand Down Expand Up @@ -668,6 +680,35 @@ void SystemMetrics::_install_query_cache_metrics(starrocks::MetricRegistry* regi
registry->register_metric("query_cache_hit_ratio", &_query_cache_metrics->query_cache_hit_ratio);
}

void SystemMetrics::_install_runtime_filter_metrics(starrocks::MetricRegistry* registry) {
for (int i = 0; i < EventType::MAX_COUNT; i++) {
auto* metrics = new RuntimeFilterMetrics();
const auto& type = EventTypeToString((EventType)i);
#define REGISTER_RUNTIME_FILTER_METRIC(name) \
registry->register_metric(#name, MetricLabels().add("type", type), &metrics->name)
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_events_in_queue);
REGISTER_RUNTIME_FILTER_METRIC(runtime_filter_bytes_in_queue);
_runtime_filter_metrics.emplace(type, metrics);
}
}

void SystemMetrics::_update_runtime_filter_metrics() {
auto* runtime_filter_worker = ExecEnv::GetInstance()->runtime_filter_worker();
if (UNLIKELY(runtime_filter_worker == nullptr)) {
return;
}
const auto* metrics = runtime_filter_worker->metrics();
for (int i = 0; i < EventType::MAX_COUNT; i++) {
const auto& event_name = EventTypeToString((EventType)i);
auto iter = _runtime_filter_metrics.find(event_name);
if (iter == _runtime_filter_metrics.end()) {
continue;
}
iter->second->runtime_filter_events_in_queue.set_value(metrics->event_nums[i]);
iter->second->runtime_filter_bytes_in_queue.set_value(metrics->runtime_filter_bytes[i]);
}
}

void SystemMetrics::_update_fd_metrics() {
#ifdef BE_TEST
FILE* fp = fopen(k_ut_fd_path, "r");
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/system_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class NetMetrics;
class FileDescriptorMetrics;
class SnmpMetrics;
class QueryCacheMetrics;
class RuntimeFilterMetrics;

class MemoryMetrics {
public:
Expand Down Expand Up @@ -142,6 +143,10 @@ class SystemMetrics {

void _update_query_cache_metrics();

void _install_runtime_filter_metrics(MetricRegistry* registry);

void _update_runtime_filter_metrics();

private:
static const char* const _s_hook_name;

Expand All @@ -151,6 +156,7 @@ class SystemMetrics {
std::map<std::string, NetMetrics*> _net_metrics;
std::unique_ptr<FileDescriptorMetrics> _fd_metrics;
std::unique_ptr<QueryCacheMetrics> _query_cache_metrics;
std::map<std::string, RuntimeFilterMetrics*> _runtime_filter_metrics;
int _proc_net_dev_version = 0;
std::unique_ptr<SnmpMetrics> _snmp_metrics;

Expand Down
Loading