From bdca9cce3e3174dfeecb56ada2d3ebe173d311b6 Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 26 Nov 2024 17:09:55 +0800 Subject: [PATCH] [branch-2.1]add internal workload group (#42006) (#44592) Add an internal workload group when Doris started, currently it mainly used to manage compaction workload cpu usage. pick #42006 --- be/src/agent/cgroup_cpu_ctl.cpp | 6 +- be/src/agent/cgroup_cpu_ctl.h | 2 +- be/src/agent/topic_subscriber.cpp | 6 +- be/src/olap/olap_server.cpp | 77 ++++-- be/src/olap/storage_engine.h | 3 +- be/src/pipeline/task_scheduler.h | 4 +- be/src/runtime/exec_env_init.cpp | 4 +- .../runtime/workload_group/workload_group.cpp | 94 +++++--- .../runtime/workload_group/workload_group.h | 20 +- .../workload_group/workload_group_manager.cpp | 23 ++ .../workload_group/workload_group_manager.h | 12 + be/src/util/threadpool.cpp | 7 +- be/src/util/threadpool.h | 6 +- be/src/vec/exec/scan/scanner_scheduler.h | 9 +- .../vec/sink/writer/async_result_writer.cpp | 12 + .../analysis/AlterWorkloadGroupStmt.java | 20 +- .../analysis/CreateWorkloadGroupStmt.java | 18 +- .../doris/analysis/DropWorkloadGroupStmt.java | 3 - .../java/org/apache/doris/catalog/Env.java | 2 + .../org/apache/doris/common/FeConstants.java | 3 + .../CreateInternalWorkloadGroupThread.java | 55 +++++ .../resource/workloadgroup/WorkloadGroup.java | 81 +++++-- .../workloadgroup/WorkloadGroupMgr.java | 109 ++++++--- .../workloadgroup/WorkloadGroupMgrTest.java | 222 ++++++++++++++++++ .../doris/utframe/TestWithFeService.java | 1 + gensrc/thrift/BackendService.thrift | 3 + .../workload_manager_p0/test_curd_wlg.groovy | 29 +++ 27 files changed, 693 insertions(+), 138 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index e68535a708c49b..76b72f2c9d00ae 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() { return _is_enable_cgroup_v2_in_env ? 100 : 1024; } -std::unique_ptr CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) { +std::shared_ptr CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) { if (_is_enable_cgroup_v2_in_env) { - return std::make_unique(wg_id); + return std::make_shared(wg_id); } else if (_is_enable_cgroup_v1_in_env) { - return std::make_unique(wg_id); + return std::make_shared(wg_id); } return nullptr; } diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 84e191159f15f1..b23f1f4dd9cadb 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -52,7 +52,7 @@ class CgroupCpuCtl { static Status delete_unused_cgroup_path(std::set& used_wg_ids); - static std::unique_ptr create_cgroup_cpu_ctl(uint64_t wg_id); + static std::shared_ptr create_cgroup_cpu_ctl(uint64_t wg_id); static bool is_a_valid_cgroup_path(std::string cg_path); diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index f62bdaef0991c9..b470e1534e1c6f 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques // eg, update workload info may delay other listener, then we need add a thread here // to handle_topic_info asynchronous std::shared_lock lock(_listener_mtx); - LOG(INFO) << "[topic_publish]begin handle topic info"; for (auto& listener_pair : _registered_listeners) { if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) { - LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first - << ", size=" << topic_request.topic_map.at(listener_pair.first).size(); listener_pair.second->handle_topic_info( topic_request.topic_map.at(listener_pair.first)); - LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first; + LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first + << ", size=" << topic_request.topic_map.at(listener_pair.first).size(); } } } diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index ddd35ae2f9213d..f5f796d973b928 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -125,7 +125,7 @@ static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) { return threads_num; } -Status StorageEngine::start_bg_threads() { +Status StorageEngine::start_bg_threads(std::shared_ptr wg_sptr) { RETURN_IF_ERROR(Thread::create( "StorageEngine", "unused_rowset_monitor_thread", [this]() { this->_unused_rowset_monitor_thread_callback(); }, @@ -155,29 +155,60 @@ Status StorageEngine::start_bg_threads() { auto single_replica_compaction_threads = get_single_replica_compaction_threads_num(data_dirs.size()); - RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") - .set_min_threads(base_compaction_threads) - .set_max_threads(base_compaction_threads) - .build(&_base_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") - .set_min_threads(cumu_compaction_threads) - .set_max_threads(cumu_compaction_threads) - .build(&_cumu_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") - .set_min_threads(single_replica_compaction_threads) - .set_max_threads(single_replica_compaction_threads) - .build(&_single_replica_compaction_thread_pool)); - - if (config::enable_segcompaction) { - RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") - .set_min_threads(config::segcompaction_num_threads) - .set_max_threads(config::segcompaction_num_threads) - .build(&_seg_compaction_thread_pool)); + if (wg_sptr && wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) { + RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool") + .set_min_threads(base_compaction_threads) + .set_max_threads(base_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool") + .set_min_threads(cumu_compaction_threads) + .set_max_threads(cumu_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cumu_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool") + .set_min_threads(single_replica_compaction_threads) + .set_max_threads(single_replica_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_single_replica_compaction_thread_pool)); + + if (config::enable_segcompaction) { + RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool") + .set_min_threads(config::segcompaction_num_threads) + .set_max_threads(config::segcompaction_num_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_seg_compaction_thread_pool)); + } + RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool") + .set_min_threads(config::cold_data_compaction_thread_num) + .set_max_threads(config::cold_data_compaction_thread_num) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cold_data_compaction_thread_pool)); + } else { + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(base_compaction_threads) + .set_max_threads(base_compaction_threads) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(cumu_compaction_threads) + .set_max_threads(cumu_compaction_threads) + .build(&_cumu_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") + .set_min_threads(single_replica_compaction_threads) + .set_max_threads(single_replica_compaction_threads) + .build(&_single_replica_compaction_thread_pool)); + + if (config::enable_segcompaction) { + RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") + .set_min_threads(config::segcompaction_num_threads) + .set_max_threads(config::segcompaction_num_threads) + .build(&_seg_compaction_thread_pool)); + } + RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") + .set_min_threads(config::cold_data_compaction_thread_num) + .set_max_threads(config::cold_data_compaction_thread_num) + .build(&_cold_data_compaction_thread_pool)); } - RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") - .set_min_threads(config::cold_data_compaction_thread_num) - .set_max_threads(config::cold_data_compaction_thread_num) - .build(&_cold_data_compaction_thread_pool)); // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 560b246274b528..fcd993dd1babed 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -71,6 +71,7 @@ class TxnManager; class ReportWorker; class CreateTabletIdxCache; struct DirInfo; +class WorkloadGroup; using SegCompactionCandidates = std::vector; using SegCompactionCandidatesSharedPtr = std::shared_ptr; @@ -171,7 +172,7 @@ class StorageEngine { } // start all background threads. This should be call after env is ready. - Status start_bg_threads(); + Status start_bg_threads(std::shared_ptr wg_sptr = nullptr); // clear trash and snapshot file // option: update disk usage after sweep diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index c33103bfd30fec..55aa31fc7694d7 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -76,7 +76,7 @@ class TaskScheduler { public: TaskScheduler(ExecEnv* exec_env, std::shared_ptr b_scheduler, std::shared_ptr task_queue, std::string name, - CgroupCpuCtl* cgroup_cpu_ctl) + std::shared_ptr cgroup_cpu_ctl) : _task_queue(std::move(task_queue)), _blocked_task_scheduler(std::move(b_scheduler)), _shutdown(false), @@ -102,7 +102,7 @@ class TaskScheduler { std::shared_ptr _blocked_task_scheduler; std::atomic _shutdown; std::string _name; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; void _do_work(size_t index); }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index e47e26e8f6b034..5f54395a66f6ac 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -222,6 +222,7 @@ Status ExecEnv::_init(const std::vector& store_paths, _pipeline_tracer_ctx = std::make_unique(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); _workload_group_manager = new WorkloadGroupMgr(); + _workload_group_manager->init_internal_workload_group(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, @@ -295,7 +296,8 @@ Status ExecEnv::_init(const std::vector& store_paths, return st; } _storage_engine->set_heartbeat_flags(this->heartbeat_flags()); - if (st = _storage_engine->start_bg_threads(); !st.ok()) { + WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg(); + if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) { LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st; return st; } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 971750eb097aa1..07d4177f7f6a5c 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -50,7 +50,9 @@ const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; -WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) +WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : WorkloadGroup(wg_info, true) {} + +WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool) : _id(tg_info.id), _name(tg_info.name), _version(tg_info.version), @@ -65,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _spill_low_watermark(tg_info.spill_low_watermark), _spill_high_watermark(tg_info.spill_high_watermark), _scan_bytes_per_second(tg_info.read_bytes_per_second), - _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) { + _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second), + _need_create_query_thread_pool(need_create_query_thread_pool) { std::vector& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; for (const auto& data_dir : data_dir_list) { _scan_io_throttle_map[data_dir.path] = @@ -419,35 +422,42 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g return Status::OK(); } -void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) { - uint64_t tg_id = tg_info->id; - std::string tg_name = tg_info->name; - int cpu_hard_limit = tg_info->cpu_hard_limit; - uint64_t cpu_shares = tg_info->cpu_share; - bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; - int scan_thread_num = tg_info->scan_thread_num; - int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num; - int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num; +std::weak_ptr WorkloadGroup::get_cgroup_cpu_ctl_wptr() { + std::shared_lock rlock(_task_sched_lock); + return _cgroup_cpu_ctl; +} +void WorkloadGroup::create_cgroup_cpu_ctl() { std::lock_guard wlock(_task_sched_lock); + create_cgroup_cpu_ctl_no_lock(); +} + +void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() { if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) { - std::unique_ptr cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id); + std::shared_ptr cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(_id); if (cgroup_cpu_ctl) { Status ret = cgroup_cpu_ctl->init(); if (ret.ok()) { _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); - LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id; + LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << _id; } else { - LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << _id << ", reason=" << ret.to_string(); } } else { - LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed"; + LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl wg_id=" << _id << " failed"; } } +} - CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get(); - +void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, + std::shared_ptr cg_cpu_ctl_ptr, + ExecEnv* exec_env) { + uint64_t wg_id = wg_info->id; + std::string wg_name = wg_info->name; + int scan_thread_num = wg_info->scan_thread_num; + int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num; + int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num; if (_task_sched == nullptr) { int32_t executors_size = config::pipeline_executor_size; if (executors_size <= 0) { @@ -457,18 +467,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e std::unique_ptr pipeline_task_scheduler = std::make_unique( exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue), - "Pipe_" + tg_name, cg_cpu_ctl_ptr); + "Pipe_" + wg_name, cg_cpu_ctl_ptr); Status ret = pipeline_task_scheduler->start(); if (ret.ok()) { _task_sched = std::move(pipeline_task_scheduler); } else { - LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id; + LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << wg_id; } } if (_scan_task_sched == nullptr) { std::unique_ptr scan_scheduler = - std::make_unique("Scan_" + tg_name, + std::make_unique("Scan_" + wg_name, cg_cpu_ctl_ptr); Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_thread_num, @@ -476,34 +486,33 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e if (ret.ok()) { _scan_task_sched = std::move(scan_scheduler); } else { - LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id; + LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << wg_id; } } if (scan_thread_num > 0 && _scan_task_sched) { _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num); } - if (_remote_scan_task_sched == nullptr) { int remote_max_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num(); int remote_scan_thread_queue_size = vectorized::ScannerScheduler::get_remote_scan_thread_queue_size(); std::unique_ptr remote_scan_scheduler = - std::make_unique("RScan_" + tg_name, + std::make_unique("RScan_" + wg_name, cg_cpu_ctl_ptr); - Status ret = remote_scan_scheduler->start(remote_max_thread_num, remote_max_thread_num, + Status ret = remote_scan_scheduler->start(remote_max_thread_num, + config::doris_scanner_min_thread_pool_thread_num, remote_scan_thread_queue_size); if (ret.ok()) { _remote_scan_task_sched = std::move(remote_scan_scheduler); } else { LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start failed, gid=" - << tg_id; + << wg_id; } } if (max_remote_scan_thread_num >= min_remote_scan_thread_num && _remote_scan_task_sched) { _remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num, min_remote_scan_thread_num); } - if (_memtable_flush_pool == nullptr) { int num_disk = ExecEnv::GetInstance()->get_storage_engine()->get_disk_num(); // -1 means disk num may not be inited, so not create flush pool @@ -512,7 +521,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e size_t min_threads = std::max(1, config::wg_flush_thread_num_per_store); size_t max_threads = num_disk * min_threads; - std::string pool_name = "wg_flush_" + tg_name; + std::string pool_name = "wg_flush_" + wg_name; auto ret = ThreadPoolBuilder(pool_name) .set_min_threads(min_threads) .set_max_threads(max_threads) @@ -520,17 +529,24 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e .build(&thread_pool); if (!ret.ok()) { LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid=" - << tg_id; + << wg_id; } else { _memtable_flush_pool = std::move(thread_pool); - LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << wg_id << ", max thread num=" << max_threads << ", min thread num=" << min_threads; } } } +} + +void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) { + uint64_t wg_id = wg_info->id; + int cpu_hard_limit = wg_info->cpu_hard_limit; + uint64_t cpu_shares = wg_info->cpu_share; + bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit; + create_cgroup_cpu_ctl_no_lock(); - // step 6: update cgroup cpu if needed if (_cgroup_cpu_ctl) { if (enable_cpu_hard_limit) { if (cpu_hard_limit > 0) { @@ -538,16 +554,26 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e _cgroup_cpu_ctl->update_cpu_soft_limit( CgroupCpuCtl::cpu_soft_limit_default_value()); } else { - LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: " - << cpu_hard_limit << ", gid=" << tg_id; + LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is " + "illegal: " + << cpu_hard_limit << ", gid=" << wg_id; } } else { _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); _cgroup_cpu_ctl->update_cpu_hard_limit( CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit } - _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), - &(tg_info->cgroup_cpu_hard_limit)); + _cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares), + &(wg_info->cgroup_cpu_hard_limit)); + } +} + +void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info, ExecEnv* exec_env) { + std::lock_guard wlock(_task_sched_lock); + upsert_cgroup_cpu_ctl_no_lock(wg_info); + + if (_need_create_query_thread_pool) { + upsert_thread_pool_no_lock(wg_info, _cgroup_cpu_ctl, exec_env); } } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 5d6b201eaab2cd..22b1405eeab624 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -59,6 +59,8 @@ class WorkloadGroup : public std::enable_shared_from_this { public: explicit WorkloadGroup(const WorkloadGroupInfo& tg_info); + explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool); + int64_t version() const { return _version; } uint64_t cpu_share() const { return _cpu_share.load(); } @@ -210,7 +212,17 @@ class WorkloadGroup : public std::enable_shared_from_this { } int64_t get_remote_scan_bytes_per_second(); + void create_cgroup_cpu_ctl(); + + std::weak_ptr get_cgroup_cpu_ctl_wptr(); + private: + void create_cgroup_cpu_ctl_no_lock(); + void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info); + void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, + std::shared_ptr cg_cpu_ctl_ptr, + ExecEnv* exec_env); + mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; std::string _name; @@ -241,7 +253,10 @@ class WorkloadGroup : public std::enable_shared_from_this { std::unordered_map> _query_ctxs; std::shared_mutex _task_sched_lock; - std::unique_ptr _cgroup_cpu_ctl {nullptr}; + // _cgroup_cpu_ctl not only used by threadpool which managed by WorkloadGroup, + // but also some global background threadpool which not owned by WorkloadGroup, + // so it should be shared ptr; + std::shared_ptr _cgroup_cpu_ctl {nullptr}; std::unique_ptr _task_sched {nullptr}; std::unique_ptr _scan_task_sched {nullptr}; std::unique_ptr _remote_scan_task_sched {nullptr}; @@ -250,6 +265,9 @@ class WorkloadGroup : public std::enable_shared_from_this { std::map> _scan_io_throttle_map; std::shared_ptr _remote_scan_io_throttle {nullptr}; + // for some background workload, it doesn't need to create query thread pool + const bool _need_create_query_thread_pool; + // bvar metric std::unique_ptr> _mem_used_status; std::unique_ptr> _cpu_usage_adder; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 145754dd357658..bb9757c94c309f 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -33,6 +33,25 @@ namespace doris { +void WorkloadGroupMgr::init_internal_workload_group() { + WorkloadGroupPtr internal_wg = nullptr; + { + std::lock_guard w_lock(_group_mutex); + if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) == _workload_groups.end()) { + WorkloadGroupInfo internal_wg_info { + .id = INTERNAL_WORKLOAD_GROUP_ID, + .name = INTERNAL_WORKLOAD_GROUP_NAME, + .cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()}; + internal_wg = std::make_shared(internal_wg_info, false); + _workload_groups[internal_wg_info.id] = internal_wg; + } + } + DCHECK(internal_wg != nullptr); + if (internal_wg) { + internal_wg->create_cgroup_cpu_ctl(); + } +} + WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group( const WorkloadGroupInfo& workload_group_info) { { @@ -85,6 +104,10 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_i old_wg_size = _workload_groups.size(); for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { uint64_t wg_id = iter->first; + // internal workload group created by BE can not be dropped + if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) { + continue; + } auto workload_group_ptr = iter->second; if (used_wg_id.find(wg_id) == used_wg_id.end()) { workload_group_ptr->shutdown(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index d8547c3383e219..52624f05fdf1c9 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -36,11 +36,18 @@ class TaskScheduler; class MultiCoreTaskQueue; } // namespace pipeline +// internal_group is used for doris internal workload, currently is mainly compaction +const static uint64_t INTERNAL_WORKLOAD_GROUP_ID = + static_cast(TWorkloadType::type::INTERNAL); +const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal"; + class WorkloadGroupMgr { public: WorkloadGroupMgr() = default; ~WorkloadGroupMgr() = default; + void init_internal_workload_group(); + WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info); void get_related_workload_groups(const std::function& pred, @@ -62,6 +69,11 @@ class WorkloadGroupMgr { void get_wg_resource_usage(vectorized::Block* block); + WorkloadGroupPtr get_internal_wg() { + std::shared_lock r_lock(_group_mutex); + return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID]; + } + private: std::shared_mutex _group_mutex; std::unordered_map _workload_groups; diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index 15fb36181d4336..f5ea38515def36 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -75,7 +75,8 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { return *this; } -ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) { +ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl( + std::weak_ptr cgroup_cpu_ctl) { _cgroup_cpu_ctl = cgroup_cpu_ctl; return *this; } @@ -476,8 +477,8 @@ void ThreadPool::dispatch_thread() { _num_threads++; _num_threads_pending_start--; - if (_cgroup_cpu_ctl != nullptr) { - static_cast(_cgroup_cpu_ctl->add_thread_to_cgroup()); + if (std::shared_ptr cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) { + static_cast(cg_cpu_ctl_sptr->add_thread_to_cgroup()); } // Owned by this worker thread and added/removed from _idle_threads as needed. diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 5ce27e2f27b9a5..9bd4a7246fb0b1 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -107,7 +107,7 @@ class ThreadPoolBuilder { ThreadPoolBuilder& set_min_threads(int min_threads); ThreadPoolBuilder& set_max_threads(int max_threads); ThreadPoolBuilder& set_max_queue_size(int max_queue_size); - ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl); + ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr cgroup_cpu_ctl); template ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration& idle_timeout) { _idle_timeout = std::chrono::duration_cast(idle_timeout); @@ -133,7 +133,7 @@ class ThreadPoolBuilder { int _min_threads; int _max_threads; int _max_queue_size; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; std::chrono::milliseconds _idle_timeout; ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; @@ -345,7 +345,7 @@ class ThreadPool { // Protected by _lock. int _total_queued_tasks; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; // All allocated tokens. // diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 56c49368598adc..7731b3ba8f983b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -114,11 +114,8 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl) { - _is_stop.store(false); - _cgroup_cpu_ctl = cgroup_cpu_ctl; - _sched_name = sched_name; - } + SimplifiedScanScheduler(std::string sched_name, std::shared_ptr cgroup_cpu_ctl) + : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), _sched_name(sched_name) {} ~SimplifiedScanScheduler() { stop(); @@ -217,7 +214,7 @@ class SimplifiedScanScheduler { private: std::unique_ptr _scan_thread_pool; std::atomic _is_stop; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; std::string _sched_name; }; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index c83c66f241cade..2d49a0e8978988 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -113,6 +113,18 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi force_close(status); } + if (state && state->get_query_ctx() && state->get_query_ctx()->workload_group()) { + if (auto cg_ctl_sptr = + state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) { + Status ret = cg_ctl_sptr->add_thread_to_cgroup(); + if (ret.ok()) { + std::string wg_tname = + "asyc_wr_" + state->get_query_ctx()->workload_group()->name(); + Thread::set_self_name(wg_tname); + } + } + } + if (_writer_status.ok()) { while (true) { ThreadCpuStopWatch cpu_time_stop_watch; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java index f61203383331b4..becca898b64352 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java @@ -25,6 +25,10 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; + +import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -55,14 +59,26 @@ public void analyze(Analyzer analyzer) throws UserException { } if (properties == null || properties.isEmpty()) { - throw new AnalysisException("Resource group properties can't be null"); + throw new AnalysisException("Workload Group properties can't be empty"); + } + + if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) { + throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified "); + } + + String tagStr = properties.get(WorkloadGroup.TAG); + if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName) + || WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) { + throw new AnalysisException( + WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME + + " group can not set tag"); } } @Override public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("ALTER RESOURCE GROUP '").append(workloadGroupName).append("' "); + sb.append("ALTER WORKLOAD GROUP '").append(workloadGroupName).append("' "); sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java index 92a60a94e55289..4c0c675ea0037c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java @@ -27,6 +27,9 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; + +import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -68,12 +71,19 @@ public void analyze(Analyzer analyzer) throws UserException { FeNameFormat.checkWorkloadGroupName(workloadGroupName); if (properties == null || properties.isEmpty()) { - throw new AnalysisException("Resource group properties can't be null"); + throw new AnalysisException("Workload Group properties can't be empty"); + } + + if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) { + throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified "); } - String wgTag = properties.get(WorkloadGroup.TAG); - if (wgTag != null) { - FeNameFormat.checkCommonName("workload group tag", wgTag); + String tagStr = properties.get(WorkloadGroup.TAG); + if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName) + || WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) { + throw new AnalysisException( + WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME + + " group can not set tag"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java index d7a1703771c5cd..f5ffb6f2cd2a01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java @@ -20,7 +20,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -50,8 +49,6 @@ public void analyze(Analyzer analyzer) throws UserException { if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); } - - FeNameFormat.checkWorkloadGroupName(workloadGroupName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 03679d64330ed3..5ca530669dd1b0 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -246,6 +246,7 @@ import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; @@ -1751,6 +1752,7 @@ private void startMasterOnlyDaemonThreads() { WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this); topicPublisherThread.addToTopicPublisherList(wpPublisher); topicPublisherThread.start(); + new CreateInternalWorkloadGroupThread().start(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index f137c4cab49fb6..b50eeffbcedd36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -50,6 +50,9 @@ public class FeConstants { // set to false to disable internal schema db public static boolean enableInternalSchemaDb = true; + // for UT, create internal workload group thread can not start + public static boolean shouldCreateInternalWorkloadGroup = true; + // default scheduler interval is 10 seconds public static int default_scheduler_interval_millisecond = 10000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java new file mode 100644 index 00000000000000..7c6d0e3a080818 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadgroup; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeConstants; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CreateInternalWorkloadGroupThread extends Thread { + + private static final Logger LOG = LogManager.getLogger(CreateInternalWorkloadGroupThread.class); + + public CreateInternalWorkloadGroupThread() { + super("CreateInternalWorkloadGroupThread"); + } + + public void run() { + if (!FeConstants.shouldCreateInternalWorkloadGroup) { + return; + } + try { + Env env = Env.getCurrentEnv(); + while (!env.isReady()) { + Thread.sleep(5000); + } + if (!env.getWorkloadGroupMgr() + .isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) { + env.getWorkloadGroupMgr().createInternalWorkloadGroup(); + LOG.info("create internal workload group succ"); + } else { + LOG.info("internal workload group already exists."); + } + } catch (Throwable t) { + LOG.warn("create internal workload group failed. ", t); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index aa9bed42d7d125..a026025c9187a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -30,8 +30,10 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TWorkloadGroupInfo; +import org.apache.doris.thrift.TWorkloadType; import org.apache.doris.thrift.TopicInfo; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; @@ -43,8 +45,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; public class WorkloadGroup implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(WorkloadGroup.class); @@ -79,6 +84,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second"; + // it's used to define Doris's internal workload group, + // currently it is internal, only contains compaction + // later more type and workload may be included in the future. + public static final String INTERNAL_TYPE = "internal_type"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true @@ -87,7 +97,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) - .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build(); + .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build(); + + public static final ImmutableMap WORKLOAD_TYPE_MAP = new ImmutableMap.Builder() + .put(TWorkloadType.INTERNAL.toString().toLowerCase(), TWorkloadType.INTERNAL.getValue()).build(); public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; @@ -386,18 +399,6 @@ private static void checkProperties(Map properties) throws DdlEx + SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + ")"); } - String tagStr = properties.get(TAG); - if (!StringUtils.isEmpty(tagStr)) { - String[] tagArr = tagStr.split(","); - for (String tag : tagArr) { - try { - FeNameFormat.checkCommonName("workload group tag name", tag); - } catch (AnalysisException e) { - throw new DdlException("workload group tag name format is illegal, " + tagStr); - } - } - } - if (properties.containsKey(READ_BYTES_PER_SECOND)) { String readBytesVal = properties.get(READ_BYTES_PER_SECOND); try { @@ -427,6 +428,37 @@ private static void checkProperties(Map properties) throws DdlEx } } + String tagStr = properties.get(TAG); + if (!StringUtils.isEmpty(tagStr)) { + String[] tagArr = tagStr.split(","); + for (String tag : tagArr) { + try { + FeNameFormat.checkCommonName("workload group tag", tag); + } catch (AnalysisException e) { + throw new DdlException("tag format is illegal, " + tagStr); + } + } + } + + // internal workload group is usually created by Doris. + // If exception happens here, it means thrift not match WORKLOAD_TYPE_MAP. + String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE); + if (!StringUtils.isEmpty(interTypeId)) { + int wid = Integer.valueOf(interTypeId); + if (TWorkloadType.findByValue(wid) == null) { + throw new DdlException("error internal type id: " + wid + ", current id map:" + WORKLOAD_TYPE_MAP); + } + } + + } + + + Optional getInternalTypeId() { + String typeIdStr = this.properties.get(INTERNAL_TYPE); + if (StringUtils.isEmpty(typeIdStr)) { + return Optional.empty(); + } + return Optional.of(Integer.valueOf(typeIdStr)); } public long getId() { @@ -535,8 +567,18 @@ public int getCpuHardLimit() { return cpuHardLimit; } - public String getTag() { - return properties.get(TAG); + public Optional> getTag() { + String tagStr = properties.get(TAG); + if (StringUtils.isEmpty(tagStr)) { + return Optional.empty(); + } + + Set tagSet = new HashSet<>(); + String[] ss = tagStr.split(","); + for (String str : ss) { + tagSet.add(str); + } + return Optional.of(tagSet); } @Override @@ -550,7 +592,14 @@ public TPipelineWorkloadGroup toThrift() { public TopicInfo toTopicInfo() { TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo(); - tWorkloadGroupInfo.setId(id); + + long wgId = this.id; + Optional internalTypeId = getInternalTypeId(); + if (internalTypeId.isPresent()) { + wgId = internalTypeId.get(); + } + tWorkloadGroupInfo.setId(wgId); + tWorkloadGroupInfo.setName(name); tWorkloadGroupInfo.setVersion(version); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index a7ffddbf74ae53..d21bcc5ace5469 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -42,6 +42,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TWorkloadType; import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; @@ -49,7 +50,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -62,6 +62,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -71,6 +72,12 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost public static final Long DEFAULT_GROUP_ID = 1L; + public static final String INTERNAL_GROUP_NAME = "_internal"; + + // internal_type_id could be converted to workload group id when Workload published to BE + // refer WorkloadGroup.toTopicInfo + public static final Long INTERNAL_TYPE_ID = Long.valueOf(TWorkloadType.INTERNAL.getValue()); + public static final ImmutableList WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder() .add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT) .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT) @@ -367,44 +374,84 @@ public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlExceptio LOG.info("Create workload group success: {}", workloadGroup); } + public void createInternalWorkloadGroup() { + Map properties = Maps.newHashMap(); + // 100 is cgroup v2 default cpu_share value + properties.put(WorkloadGroup.CPU_SHARE, "100"); + properties.put(WorkloadGroup.INTERNAL_TYPE, String.valueOf(INTERNAL_TYPE_ID)); + WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), INTERNAL_GROUP_NAME, properties); + writeLock(); + try { + if (!nameToWorkloadGroup.containsKey(wg.getName())) { + nameToWorkloadGroup.put(wg.getName(), wg); + idToWorkloadGroup.put(wg.getId(), wg); + Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg); + } + } finally { + writeUnlock(); + } + } + // NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit // when create/alter workload group with same tag. // when oldWg is null it means caller is an alter stmt. private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException { - String wgTag = newWg.getTag(); - double sumOfAllMemLimit = 0; - int sumOfAllCpuHardLimit = 0; - for (Map.Entry entry : idToWorkloadGroup.entrySet()) { - WorkloadGroup wg = entry.getValue(); - if (!StringUtils.equals(wgTag, wg.getTag())) { - continue; - } + Optional> newWgTag = newWg.getTag(); + Set newWgTagSet = null; + if (newWgTag.isPresent()) { + newWgTagSet = newWgTag.get(); + } else { + newWgTagSet = new HashSet<>(); + newWgTagSet.add(null); + } - if (oldWg != null && entry.getKey() == oldWg.getId()) { - continue; - } + for (String newWgOneTag : newWgTagSet) { + double sumOfAllMemLimit = 0; + int sumOfAllCpuHardLimit = 0; - if (wg.getCpuHardLimit() > 0) { - sumOfAllCpuHardLimit += wg.getCpuHardLimit(); - } - if (wg.getMemoryLimitPercent() > 0) { - sumOfAllMemLimit += wg.getMemoryLimitPercent(); + // 1 get sum value of all wg which has same tag without current wg + for (Map.Entry entry : idToWorkloadGroup.entrySet()) { + WorkloadGroup wg = entry.getValue(); + Optional> wgTag = wg.getTag(); + + if (oldWg != null && entry.getKey() == oldWg.getId()) { + continue; + } + + if (newWgOneTag == null) { + if (wgTag.isPresent()) { + continue; + } + } else if (!wgTag.isPresent() || (!wgTag.get().contains(newWgOneTag))) { + continue; + } + + if (wg.getCpuHardLimit() > 0) { + sumOfAllCpuHardLimit += wg.getCpuHardLimit(); + } + if (wg.getMemoryLimitPercent() > 0) { + sumOfAllMemLimit += wg.getMemoryLimitPercent(); + } } - } - sumOfAllMemLimit += newWg.getMemoryLimitPercent(); - sumOfAllCpuHardLimit += newWg.getCpuHardLimit(); + // 2 sum current wg value + sumOfAllMemLimit += newWg.getMemoryLimitPercent(); + sumOfAllCpuHardLimit += newWg.getCpuHardLimit(); - if (sumOfAllMemLimit > 100.0 + 1e-6) { - throw new DdlException( - "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag - + " cannot be greater than 100.0%."); - } + // 3 check total sum + if (sumOfAllMemLimit > 100.0 + 1e-6) { + throw new DdlException( + "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + ( + newWgTag.isPresent() ? newWgTag.get() : "") + + " cannot be greater than 100.0%. current sum val:" + sumOfAllMemLimit); + } - if (sumOfAllCpuHardLimit > 100) { - throw new DdlException( - "sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " - + wgTag + " can not be greater than 100% "); + if (sumOfAllCpuHardLimit > 100) { + throw new DdlException( + "sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " + ( + newWgTag.isPresent() + ? newWgTag.get() : "") + " can not be greater than 100% "); + } } } @@ -438,8 +485,8 @@ public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException { String workloadGroupName = stmt.getWorkloadGroupName(); - if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) { - throw new DdlException("Dropping default workload group " + workloadGroupName + " is not allowed"); + if (DEFAULT_GROUP_NAME.equals(workloadGroupName) || INTERNAL_GROUP_NAME.equals(workloadGroupName)) { + throw new DdlException("Dropping workload group " + workloadGroupName + " is not allowed"); } // if a workload group exists in user property, it should not be dropped diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java index 5f1e35659667ab..d729881358e429 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java @@ -235,4 +235,226 @@ public void testAlterWorkloadGroup() throws UserException { } Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 5); } + + @Test + public void testMultiTagCreateWorkloadGroup() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + + { + String name = "empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "50%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + try { + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn3,cn100"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g5"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn5"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g6"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn5"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g7"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn5"); + try { + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + } + + + @Test + public void testMultiTagAlterWorkloadGroup() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + { + String name = "empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "50%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + AlterWorkloadGroupStmt alterStmt = new AlterWorkloadGroupStmt(name, properties); + try { + workloadGroupMgr.alterWorkloadGroup(alterStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + } + + + @Test + public void testMultiTagCreateWorkloadGroupWithNoTag() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + + { + String name = "not_empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + // create not tag workload group + { + String name = "no_tag_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "no_tag_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "no_tag_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + try { + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + { + String name = "no_tag_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index aa5ebf83292b20..76eb0fbb88aaa0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -153,6 +153,7 @@ public Set getEnableNereidsRules() { @BeforeAll public final void beforeAll() throws Exception { FeConstants.enableInternalSchemaDb = false; + FeConstants.shouldCreateInternalWorkloadGroup = false; beforeCreatingConnectContext(); connectContext = createDefaultCtx(); beforeCluster(); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 6a5e4035066fd2..e2cd9f3572d7c3 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -243,6 +243,9 @@ struct TPublishTopicResult { 1: required Status.TStatus status } +enum TWorkloadType { + INTERNAL = 2 +} service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index b272c67c85e629..96de9535314880 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -168,6 +168,30 @@ suite("test_crud_wlg") { exception "can not be greater than 100%" } + // test alter tag and type + test { + sql "alter workload group test_group properties ( 'internal_type'='13' );" + + exception "internal_type can not be create or modified" + } + + test { + sql "create workload group inter_wg properties('internal_type'='123');" + exception "internal_type can not be create or modified" + } + + test { + sql "alter workload group normal properties ('tag'='123')" + + exception "_internal and normal group can not set tag" + } + + test { + sql "alter workload group _internal properties ('tag'='123')" + + exception "_internal and normal group can not set tag" + } + sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );" qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """ qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;" @@ -475,6 +499,11 @@ suite("test_crud_wlg") { // test workload group's tag property, cpu_hard_limit + test { + sql "create workload group tag_test properties('tag'=' a, b , c ');" + exception "tag format is illegal" + } + test { sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='101%', 'tag'='tag1')" exception "must be a positive integer"