Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
pingchunzhang authored Dec 10, 2024
2 parents fcbc153 + 7dbae6e commit 2da059e
Show file tree
Hide file tree
Showing 22 changed files with 379 additions and 114 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,8 @@ DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_mBool(enable_be_proc_monitor, "false");
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");

DEFINE_Int32(workload_group_metrics_interval_ms, "5000");

DEFINE_mBool(enable_workload_group_memory_gc, "true");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ DECLARE_mBool(exit_on_exception);
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_mBool(enable_be_proc_monitor);
DECLARE_mInt32(be_proc_monitor_interval_ms);
DECLARE_Int32(workload_group_metrics_interval_ms);

DECLARE_mBool(enable_workload_group_memory_gc);

Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ void Daemon::be_proc_monitor_thread() {
}
}

void Daemon::calculate_workload_group_metrics_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) {
ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics();
}
}

void Daemon::start() {
Status st;
st = Thread::create(
Expand Down Expand Up @@ -570,6 +577,12 @@ void Daemon::start() {
&_threads.emplace_back());
}
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "workload_group_metrics",
[this]() { this->calculate_workload_group_metrics_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
}

void Daemon::stop() {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Daemon {
void cache_prune_stale_thread();
void report_runtime_query_statistics_thread();
void be_proc_monitor_thread();
void calculate_workload_group_metrics_thread();

CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ Status PipelineTask::execute(bool* eos) {
if (cpu_qs) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
query_context()->update_wg_cpu_adder(delta_cpu_time);
query_context()->update_cpu_time(delta_cpu_time);
}};
if (_wait_to_start()) {
return Status::OK();
Expand Down
33 changes: 19 additions & 14 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,12 +648,25 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
}

if (!query_ctx) {
WorkloadGroupPtr workload_group_ptr = nullptr;
std::string wg_info_str = "Workload Group not set";
if (params.__isset.workload_groups && !params.workload_groups.empty()) {
uint64_t wg_id = params.workload_groups[0].id;
workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id);
if (workload_group_ptr != nullptr) {
wg_info_str = workload_group_ptr->debug_string();
} else {
wg_info_str = "set wg but not find it in be";
}
}

// First time a fragment of a query arrived. print logs.
LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord
<< ", total fragment num on current host: " << params.fragment_num_on_host
<< ", fe process uuid: " << params.query_options.fe_process_uuid
<< ", query type: " << params.query_options.query_type
<< ", report audit fe:" << params.current_connect_fe;
<< ", report audit fe:" << params.current_connect_fe
<< ", use wg:" << wg_info_str;

// This may be a first fragment request of the query.
// Create the query fragments context.
Expand All @@ -678,19 +691,11 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para

_set_scan_concurrency(params, query_ctx.get());

if (params.__isset.workload_groups && !params.workload_groups.empty()) {
uint64_t tg_id = params.workload_groups[0].id;
WorkloadGroupPtr workload_group_ptr =
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
if (workload_group_ptr != nullptr) {
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
print_id(query_id), tg_id);
} else {
LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id())
<< "can't find its workload group " << tg_id;
}
if (workload_group_ptr != nullptr) {
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
query_ctx->set_workload_group(workload_group_ptr);
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
print_id(query_id), workload_group_ptr->id());
}
// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string()));
if (wg_id > 0) {
WorkloadGroupPtr workload_group_ptr =
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(wg_id);
ExecEnv::GetInstance()->workload_group_mgr()->get_group(wg_id);
if (workload_group_ptr) {
wg_ptr = workload_group_ptr;
wg_ptr->add_mem_tracker_limiter(mem_tracker);
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,13 @@ ThreadPool* QueryContext::get_memtable_flush_pool() {
}
}

Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
void QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
_workload_group = tg;
// Should add query first, then the workload group will not be deleted.
// see task_group_manager::delete_workload_group_by_ids
_workload_group->add_mem_tracker_limiter(query_mem_tracker);
_workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
&_memtable_flush_pool, &_remote_scan_task_scheduler);
return Status::OK();
}

void QueryContext::add_fragment_profile(
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class QueryContext {
}
}

Status set_workload_group(WorkloadGroupPtr& tg);
void set_workload_group(WorkloadGroupPtr& tg);

int execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
Expand Down Expand Up @@ -241,9 +241,9 @@ class QueryContext {
// only for file scan node
std::map<int, TFileScanRangeParams> file_scan_range_params_map;

void update_wg_cpu_adder(int64_t delta_cpu_time) {
void update_cpu_time(int64_t delta_cpu_time) {
if (_workload_group != nullptr) {
_workload_group->update_cpu_adder(delta_cpu_time);
_workload_group->update_cpu_time(delta_cpu_time);
}
}

Expand Down
72 changes: 40 additions & 32 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,40 @@
__VA_ARGS__; \
} while (0)

#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_total_local_scan_io_adder(*bytes_read); \
} \
} \
#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_local_scan_io(data_dir, *bytes_read); \
} \
} \
}

#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
if (auto* t_ctx = doris::thread_context(true)) { \
iot = t_ctx->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
} \
} \
#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_remote_scan_io(*bytes_read); \
} \
} \
}

namespace doris {
Expand Down Expand Up @@ -282,9 +284,15 @@ class ThreadContext {
return nullptr;
}

void update_total_local_scan_io_adder(size_t bytes_read) {
void update_local_scan_io(std::string path, size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_local_scan_io(path, bytes_read);
}
}

void update_remote_scan_io(size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_total_local_scan_io_adder(bytes_read);
wg_ptr->update_remote_scan_io(bytes_read);
}
}

Expand Down
36 changes: 17 additions & 19 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "runtime/exec_env.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group_metrics.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
Expand Down Expand Up @@ -71,18 +72,11 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_
_need_create_query_thread_pool(need_create_query_thread_pool) {
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] =
std::make_shared<IOThrottle>(_name, data_dir.bvar_name + "_read_bytes");
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>(_name, "remote_read_bytes");
_mem_used_status = std::make_unique<bvar::Status<int64_t>>(_name, "memory_used", 0);
_cpu_usage_adder = std::make_unique<bvar::Adder<uint64_t>>(_name, "cpu_usage_adder");
_cpu_usage_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<uint64_t>>>(
_name, "cpu_usage", _cpu_usage_adder.get(), 10);
_total_local_scan_io_adder =
std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes");
_total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1);
_scan_io_throttle_map[data_dir.path] = std::make_shared<IOThrottle>(data_dir.bvar_name);
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>();

_wg_metrics = std::make_shared<WorkloadGroupMetrics>(this);
}

std::string WorkloadGroup::debug_string() const {
Expand Down Expand Up @@ -169,11 +163,11 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
}
// refresh total memory used.
_total_mem_used = used_memory;
_wg_metrics->update_memory_used_bytes(used_memory);
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
_mem_used_status->set_value(used_memory);
return used_memory;
}

Expand Down Expand Up @@ -658,16 +652,20 @@ std::shared_ptr<IOThrottle> WorkloadGroup::get_remote_scan_io_throttle() {
return _remote_scan_io_throttle;
}

void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) {
(*_cpu_usage_adder) << (uint64_t)delta_cpu_time;
void WorkloadGroup::update_cpu_time(int64_t delta_cpu_time) {
_wg_metrics->update_cpu_time_nanos(delta_cpu_time);
}

void WorkloadGroup::update_local_scan_io(std::string path, size_t scan_bytes) {
_wg_metrics->update_local_scan_io_bytes(path, (uint64_t)scan_bytes);
}

void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) {
(*_total_local_scan_io_adder) << scan_bytes;
void WorkloadGroup::update_remote_scan_io(size_t scan_bytes) {
_wg_metrics->update_remote_scan_io_bytes((uint64_t)scan_bytes);
}

int64_t WorkloadGroup::get_remote_scan_bytes_per_second() {
return _remote_scan_io_throttle->get_bvar_io_per_second();
int64_t WorkloadGroup::get_mem_used() {
return _total_mem_used;
}

void WorkloadGroup::try_stop_schedulers() {
Expand Down
27 changes: 12 additions & 15 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#pragma once

#include <bvar/bvar.h>
#include <gen_cpp/BackendService_types.h>
#include <stddef.h>
#include <stdint.h>
Expand Down Expand Up @@ -54,6 +53,8 @@ class TaskScheduler;
class WorkloadGroup;
struct WorkloadGroupInfo;
struct TrackerLimiterGroup;
class WorkloadGroupMetrics;

class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
public:
explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
Expand Down Expand Up @@ -189,16 +190,13 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info);

void update_cpu_adder(int64_t delta_cpu_time);
void update_cpu_time(int64_t delta_cpu_time);

void update_total_local_scan_io_adder(size_t scan_bytes);
void update_local_scan_io(std::string path, size_t scan_bytes);

int64_t get_mem_used() { return _mem_used_status->get_value(); }
uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); }
int64_t get_local_scan_bytes_per_second() {
return _total_local_scan_io_per_second->get_value();
}
int64_t get_remote_scan_bytes_per_second();
void update_remote_scan_io(size_t scan_bytes);

int64_t get_mem_used();

ThreadPool* get_memtable_flush_pool_ptr() {
// no lock here because this is called by memtable flush,
Expand All @@ -209,6 +207,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();

std::shared_ptr<WorkloadGroupMetrics> get_metrics() { return _wg_metrics; }

friend class WorkloadGroupMetrics;

private:
void create_cgroup_cpu_ctl_no_lock();
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
Expand Down Expand Up @@ -260,12 +262,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
// for some background workload, it doesn't need to create query thread pool
const bool _need_create_query_thread_pool;

// bvar metric
std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _cpu_usage_per_second;
std::unique_ptr<bvar::Adder<size_t>> _total_local_scan_io_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _total_local_scan_io_per_second;
std::shared_ptr<WorkloadGroupMetrics> _wg_metrics {nullptr};
};

using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
Expand Down
Loading

0 comments on commit 2da059e

Please sign in to comment.