Skip to content

Commit

Permalink
Revert "[branch-2.1]add internal workload group (apache#42006) (apach…
Browse files Browse the repository at this point in the history
…e#44592)"

This reverts commit bdca9cc.
  • Loading branch information
wangbo committed Jan 3, 2025
1 parent 0250384 commit dddb337
Show file tree
Hide file tree
Showing 27 changed files with 152 additions and 712 deletions.
6 changes: 3 additions & 3 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
return _is_enable_cgroup_v2_in_env ? 100 : 1024;
}

std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
if (_is_enable_cgroup_v2_in_env) {
return std::make_shared<CgroupV2CpuCtl>(wg_id);
return std::make_unique<CgroupV2CpuCtl>(wg_id);
} else if (_is_enable_cgroup_v1_in_env) {
return std::make_shared<CgroupV1CpuCtl>(wg_id);
return std::make_unique<CgroupV1CpuCtl>(wg_id);
}
return nullptr;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CgroupCpuCtl {

static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);

static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);

static bool is_a_valid_cgroup_path(std::string cg_path);

Expand Down
6 changes: 4 additions & 2 deletions be/src/agent/topic_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques
// eg, update workload info may delay other listener, then we need add a thread here
// to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
LOG(INFO) << "[topic_publish]begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) {
LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first;
}
}
}
Expand Down
77 changes: 23 additions & 54 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
return threads_num;
}

Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "unused_rowset_monitor_thread",
[this]() { this->_unused_rowset_monitor_thread_callback(); },
Expand Down Expand Up @@ -155,60 +155,29 @@ Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
auto single_replica_compaction_threads =
get_single_replica_compaction_threads_num(data_dirs.size());

if (wg_sptr && wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool")
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool")
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool")
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_single_replica_compaction_thread_pool));

if (config::enable_segcompaction) {
RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool")
.set_min_threads(config::segcompaction_num_threads)
.set_max_threads(config::segcompaction_num_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_seg_compaction_thread_pool));
}
RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool")
.set_min_threads(config::cold_data_compaction_thread_num)
.set_max_threads(config::cold_data_compaction_thread_num)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_cold_data_compaction_thread_pool));
} else {
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.build(&_single_replica_compaction_thread_pool));

if (config::enable_segcompaction) {
RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
.set_min_threads(config::segcompaction_num_threads)
.set_max_threads(config::segcompaction_num_threads)
.build(&_seg_compaction_thread_pool));
}
RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
.set_min_threads(config::cold_data_compaction_thread_num)
.set_max_threads(config::cold_data_compaction_thread_num)
.build(&_cold_data_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.build(&_single_replica_compaction_thread_pool));

if (config::enable_segcompaction) {
RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
.set_min_threads(config::segcompaction_num_threads)
.set_max_threads(config::segcompaction_num_threads)
.build(&_seg_compaction_thread_pool));
}
RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
.set_min_threads(config::cold_data_compaction_thread_num)
.set_max_threads(config::cold_data_compaction_thread_num)
.build(&_cold_data_compaction_thread_pool));

// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class TxnManager;
class ReportWorker;
class CreateTabletIdxCache;
struct DirInfo;
class WorkloadGroup;

using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
Expand Down Expand Up @@ -172,7 +171,7 @@ class StorageEngine {
}

// start all background threads. This should be call after env is ready.
Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr);
Status start_bg_threads();

// clear trash and snapshot file
// option: update disk usage after sweep
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class TaskScheduler {
public:
TaskScheduler(ExecEnv* exec_env, std::shared_ptr<BlockedTaskScheduler> b_scheduler,
std::shared_ptr<TaskQueue> task_queue, std::string name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
CgroupCpuCtl* cgroup_cpu_ctl)
: _task_queue(std::move(task_queue)),
_blocked_task_scheduler(std::move(b_scheduler)),
_shutdown(false),
Expand All @@ -102,7 +102,7 @@ class TaskScheduler {
std::shared_ptr<BlockedTaskScheduler> _blocked_task_scheduler;
std::atomic<bool> _shutdown;
std::string _name;
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;

void _do_work(size_t index);
};
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_workload_group_manager = new WorkloadGroupMgr();
_workload_group_manager->init_internal_workload_group();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
Expand Down Expand Up @@ -297,8 +296,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
return st;
}
_storage_engine->set_heartbeat_flags(this->heartbeat_flags());
WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
if (st = _storage_engine->start_bg_threads(); !st.ok()) {
LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st;
return st;
}
Expand Down
Loading

0 comments on commit dddb337

Please sign in to comment.