Skip to content

Commit

Permalink
[wg](chore) rename workload group memory property names (#44028)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg authored Nov 20, 2024
1 parent b3cb480 commit ee9252a
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 96 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupsScanner::_s_tbls_colu
{"SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MAX_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"TAG", TYPE_VARCHAR, sizeof(StringRef), true},
{"READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
{"REMOTE_READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
Expand Down
42 changes: 21 additions & 21 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ namespace doris {
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
const static int MEMORY_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int MEMORY_HIGH_WATERMARK_DEFAULT_VALUE = 80;

WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : WorkloadGroup(wg_info, true) {}

Expand All @@ -64,8 +64,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_
_scan_thread_num(tg_info.scan_thread_num),
_max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
_min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num),
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark),
_memory_low_watermark(tg_info.memory_low_watermark),
_memory_high_watermark(tg_info.memory_high_watermark),
_scan_bytes_per_second(tg_info.read_bytes_per_second),
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
_need_create_query_thread_pool(need_create_query_thread_pool) {
Expand All @@ -91,27 +91,27 @@ std::string WorkloadGroup::debug_string() const {
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, "
"spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}, "
"memory_low_watermark={}, memory_high_watermark={}, is_shutdown={}, query_num={}, "
"read_bytes_per_second={}, remote_read_bytes_per_second={}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num,
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size(),
_memory_low_watermark, _memory_high_watermark, _is_shutdown, _query_ctxs.size(),
_scan_bytes_per_second, _remote_scan_bytes_per_second);
}

std::string WorkloadGroup::memory_debug_string() const {
return fmt::format(
"TG[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, weighted_memory_limit = {}, total_mem_used = {}, "
"wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, "
"spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]",
"wg_refresh_interval_memory_growth = {}, memory_low_watermark = {}, "
"memory_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]",
_id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false",
PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES),
PrettyPrinter::print(_total_mem_used, TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth, TUnit::BYTES),
_spill_low_watermark, _spill_high_watermark, _version, _is_shutdown,
_memory_low_watermark, _memory_high_watermark, _version, _is_shutdown,
_query_ctxs.size());
}

Expand All @@ -137,8 +137,8 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
_scan_thread_num = tg_info.scan_thread_num;
_max_remote_scan_thread_num = tg_info.max_remote_scan_thread_num;
_min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num;
_spill_low_watermark = tg_info.spill_low_watermark;
_spill_high_watermark = tg_info.spill_high_watermark;
_memory_low_watermark = tg_info.memory_low_watermark;
_memory_high_watermark = tg_info.memory_high_watermark;
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second;
} else {
Expand Down Expand Up @@ -396,16 +396,16 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
min_remote_scan_thread_num = tworkload_group_info.min_remote_scan_thread_num;
}

// 12 spill low watermark
int spill_low_watermark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.spill_threshold_low_watermark) {
spill_low_watermark = tworkload_group_info.spill_threshold_low_watermark;
// 12 memory low watermark
int memory_low_watermark = MEMORY_LOW_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.memory_low_watermark) {
memory_low_watermark = tworkload_group_info.memory_low_watermark;
}

// 13 spil high watermark
int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.spill_threshold_high_watermark) {
spill_high_watermark = tworkload_group_info.spill_threshold_high_watermark;
// 13 memory high watermark
int memory_high_watermark = MEMORY_HIGH_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.memory_high_watermark) {
memory_high_watermark = tworkload_group_info.memory_high_watermark;
}

// 14 scan io
Expand Down Expand Up @@ -433,8 +433,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.scan_thread_num = scan_thread_num,
.max_remote_scan_thread_num = max_remote_scan_thread_num,
.min_remote_scan_thread_num = min_remote_scan_thread_num,
.spill_low_watermark = spill_low_watermark,
.spill_high_watermark = spill_high_watermark,
.memory_low_watermark = memory_low_watermark,
.memory_high_watermark = memory_high_watermark,
.read_bytes_per_second = read_bytes_per_second,
.remote_read_bytes_per_second = remote_read_bytes_per_second};
}
Expand Down
22 changes: 11 additions & 11 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void do_sweep();

int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
int memory_low_watermark() const {
return _memory_low_watermark.load(std::memory_order_relaxed);
}
int spill_threashold_high_water_mark() const {
return _spill_high_watermark.load(std::memory_order_relaxed);
int memory_high_watermark() const {
return _memory_high_watermark.load(std::memory_order_relaxed);
}

void set_weighted_memory_ratio(double ratio);
Expand All @@ -107,7 +107,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
_memory_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
Expand All @@ -122,10 +122,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
_memory_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
_memory_high_watermark.load(std::memory_order_relaxed) / 100));
}

std::string debug_string() const;
Expand Down Expand Up @@ -233,8 +233,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::atomic<int> _scan_thread_num;
std::atomic<int> _max_remote_scan_thread_num;
std::atomic<int> _min_remote_scan_thread_num;
std::atomic<int> _spill_low_watermark;
std::atomic<int> _spill_high_watermark;
std::atomic<int> _memory_low_watermark;
std::atomic<int> _memory_high_watermark;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};

Expand Down Expand Up @@ -282,8 +282,8 @@ struct WorkloadGroupInfo {
const int scan_thread_num = 0;
const int max_remote_scan_thread_num = 0;
const int min_remote_scan_thread_num = 0;
const int spill_low_watermark = 0;
const int spill_high_watermark = 0;
const int memory_low_watermark = 0;
const int memory_high_watermark = 0;
const int read_bytes_per_second = -1;
const int remote_read_bytes_per_second = -1;
// log cgroup cpu info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ public class SchemaTable extends Table {
.column("SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("MAX_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("MEMORY_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("MEMORY_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("TAG", ScalarType.createVarchar(256))
.column("READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
Expand Down
Loading

0 comments on commit ee9252a

Please sign in to comment.