Skip to content

Commit

Permalink
[Improment]Add workload group resource usage metrics (#44870)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo authored Dec 10, 2024
1 parent 972c8df commit 7dbae6e
Show file tree
Hide file tree
Showing 19 changed files with 353 additions and 92 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,8 @@ DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_mBool(enable_be_proc_monitor, "false");
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");

DEFINE_Int32(workload_group_metrics_interval_ms, "5000");

DEFINE_mBool(enable_workload_group_memory_gc, "true");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ DECLARE_mBool(exit_on_exception);
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_mBool(enable_be_proc_monitor);
DECLARE_mInt32(be_proc_monitor_interval_ms);
DECLARE_Int32(workload_group_metrics_interval_ms);

DECLARE_mBool(enable_workload_group_memory_gc);

Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ void Daemon::be_proc_monitor_thread() {
}
}

void Daemon::calculate_workload_group_metrics_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) {
ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics();
}
}

void Daemon::start() {
Status st;
st = Thread::create(
Expand Down Expand Up @@ -570,6 +577,12 @@ void Daemon::start() {
&_threads.emplace_back());
}
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "workload_group_metrics",
[this]() { this->calculate_workload_group_metrics_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
}

void Daemon::stop() {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Daemon {
void cache_prune_stale_thread();
void report_runtime_query_statistics_thread();
void be_proc_monitor_thread();
void calculate_workload_group_metrics_thread();

CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ Status PipelineTask::execute(bool* eos) {
if (cpu_qs) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
query_context()->update_wg_cpu_adder(delta_cpu_time);
query_context()->update_cpu_time(delta_cpu_time);
}};
if (_wait_to_start()) {
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ class QueryContext {
// only for file scan node
std::map<int, TFileScanRangeParams> file_scan_range_params_map;

void update_wg_cpu_adder(int64_t delta_cpu_time) {
void update_cpu_time(int64_t delta_cpu_time) {
if (_workload_group != nullptr) {
_workload_group->update_cpu_adder(delta_cpu_time);
_workload_group->update_cpu_time(delta_cpu_time);
}
}

Expand Down
72 changes: 40 additions & 32 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,40 @@
__VA_ARGS__; \
} while (0)

#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_total_local_scan_io_adder(*bytes_read); \
} \
} \
#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_local_scan_io(data_dir, *bytes_read); \
} \
} \
}

#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
if (auto* t_ctx = doris::thread_context(true)) { \
iot = t_ctx->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
} \
} \
#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_remote_scan_io(*bytes_read); \
} \
} \
}

namespace doris {
Expand Down Expand Up @@ -282,9 +284,15 @@ class ThreadContext {
return nullptr;
}

void update_total_local_scan_io_adder(size_t bytes_read) {
void update_local_scan_io(std::string path, size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_local_scan_io(path, bytes_read);
}
}

void update_remote_scan_io(size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_total_local_scan_io_adder(bytes_read);
wg_ptr->update_remote_scan_io(bytes_read);
}
}

Expand Down
36 changes: 17 additions & 19 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "runtime/exec_env.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group_metrics.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
Expand Down Expand Up @@ -71,18 +72,11 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_
_need_create_query_thread_pool(need_create_query_thread_pool) {
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] =
std::make_shared<IOThrottle>(_name, data_dir.bvar_name + "_read_bytes");
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>(_name, "remote_read_bytes");
_mem_used_status = std::make_unique<bvar::Status<int64_t>>(_name, "memory_used", 0);
_cpu_usage_adder = std::make_unique<bvar::Adder<uint64_t>>(_name, "cpu_usage_adder");
_cpu_usage_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<uint64_t>>>(
_name, "cpu_usage", _cpu_usage_adder.get(), 10);
_total_local_scan_io_adder =
std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes");
_total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1);
_scan_io_throttle_map[data_dir.path] = std::make_shared<IOThrottle>(data_dir.bvar_name);
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>();

_wg_metrics = std::make_shared<WorkloadGroupMetrics>(this);
}

std::string WorkloadGroup::debug_string() const {
Expand Down Expand Up @@ -169,11 +163,11 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
}
// refresh total memory used.
_total_mem_used = used_memory;
_wg_metrics->update_memory_used_bytes(used_memory);
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
_mem_used_status->set_value(used_memory);
return used_memory;
}

Expand Down Expand Up @@ -658,16 +652,20 @@ std::shared_ptr<IOThrottle> WorkloadGroup::get_remote_scan_io_throttle() {
return _remote_scan_io_throttle;
}

void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) {
(*_cpu_usage_adder) << (uint64_t)delta_cpu_time;
void WorkloadGroup::update_cpu_time(int64_t delta_cpu_time) {
_wg_metrics->update_cpu_time_nanos(delta_cpu_time);
}

void WorkloadGroup::update_local_scan_io(std::string path, size_t scan_bytes) {
_wg_metrics->update_local_scan_io_bytes(path, (uint64_t)scan_bytes);
}

void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) {
(*_total_local_scan_io_adder) << scan_bytes;
void WorkloadGroup::update_remote_scan_io(size_t scan_bytes) {
_wg_metrics->update_remote_scan_io_bytes((uint64_t)scan_bytes);
}

int64_t WorkloadGroup::get_remote_scan_bytes_per_second() {
return _remote_scan_io_throttle->get_bvar_io_per_second();
int64_t WorkloadGroup::get_mem_used() {
return _total_mem_used;
}

void WorkloadGroup::try_stop_schedulers() {
Expand Down
27 changes: 12 additions & 15 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#pragma once

#include <bvar/bvar.h>
#include <gen_cpp/BackendService_types.h>
#include <stddef.h>
#include <stdint.h>
Expand Down Expand Up @@ -54,6 +53,8 @@ class TaskScheduler;
class WorkloadGroup;
struct WorkloadGroupInfo;
struct TrackerLimiterGroup;
class WorkloadGroupMetrics;

class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
public:
explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
Expand Down Expand Up @@ -189,16 +190,13 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info);

void update_cpu_adder(int64_t delta_cpu_time);
void update_cpu_time(int64_t delta_cpu_time);

void update_total_local_scan_io_adder(size_t scan_bytes);
void update_local_scan_io(std::string path, size_t scan_bytes);

int64_t get_mem_used() { return _mem_used_status->get_value(); }
uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); }
int64_t get_local_scan_bytes_per_second() {
return _total_local_scan_io_per_second->get_value();
}
int64_t get_remote_scan_bytes_per_second();
void update_remote_scan_io(size_t scan_bytes);

int64_t get_mem_used();

ThreadPool* get_memtable_flush_pool_ptr() {
// no lock here because this is called by memtable flush,
Expand All @@ -209,6 +207,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();

std::shared_ptr<WorkloadGroupMetrics> get_metrics() { return _wg_metrics; }

friend class WorkloadGroupMetrics;

private:
void create_cgroup_cpu_ctl_no_lock();
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
Expand Down Expand Up @@ -260,12 +262,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
// for some background workload, it doesn't need to create query thread pool
const bool _need_create_query_thread_pool;

// bvar metric
std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _cpu_usage_per_second;
std::unique_ptr<bvar::Adder<size_t>> _total_local_scan_io_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _total_local_scan_io_per_second;
std::shared_ptr<WorkloadGroupMetrics> _wg_metrics {nullptr};
};

using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
Expand Down
20 changes: 15 additions & 5 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "pipeline/task_scheduler.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_metrics.h"
#include "util/mem_info.h"
#include "util/threadpool.h"
#include "util/time.h"
Expand Down Expand Up @@ -287,16 +288,25 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
for (const auto& [id, wg] : _workload_groups) {
SchemaScannerHelper::insert_int64_value(0, be_id, block);
SchemaScannerHelper::insert_int64_value(1, wg->id(), block);
SchemaScannerHelper::insert_int64_value(2, wg->get_mem_used(), block);
SchemaScannerHelper::insert_int64_value(2, wg->get_metrics()->get_memory_used(), block);

double cpu_usage_p =
(double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100;
double cpu_usage_p = (double)wg->get_metrics()->get_cpu_time_nanos_per_second() /
(double)total_cpu_time_ns_per_second * 100;
cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0;

SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block);

SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int64_value(
4, wg->get_metrics()->get_local_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int64_value(
5, wg->get_metrics()->get_remote_scan_bytes_per_second(), block);
}
}

void WorkloadGroupMgr::refresh_workload_group_metrics() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (const auto& [id, wg] : _workload_groups) {
wg->get_metrics()->refresh_metrics();
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class WorkloadGroupMgr {
return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
}

void refresh_workload_group_metrics();

private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
Expand Down
Loading

0 comments on commit 7dbae6e

Please sign in to comment.