diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1528d9091c2f43..68b082c08734a6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1216,6 +1216,8 @@ DEFINE_mString(doris_cgroup_cpu_path, ""); DEFINE_mBool(enable_be_proc_monitor, "false"); DEFINE_mInt32(be_proc_monitor_interval_ms, "10000"); +DEFINE_Int32(workload_group_metrics_interval_ms, "5000"); + DEFINE_mBool(enable_workload_group_memory_gc, "true"); DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 3bdbeb95edb65d..c0b2e19b49a6be 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1295,6 +1295,7 @@ DECLARE_mBool(exit_on_exception); DECLARE_mString(doris_cgroup_cpu_path); DECLARE_mBool(enable_be_proc_monitor); DECLARE_mInt32(be_proc_monitor_interval_ms); +DECLARE_Int32(workload_group_metrics_interval_ms); DECLARE_mBool(enable_workload_group_memory_gc); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index d3d55f10dde5fb..73035ecf3957eb 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -522,6 +522,13 @@ void Daemon::be_proc_monitor_thread() { } } +void Daemon::calculate_workload_group_metrics_thread() { + while (!_stop_background_threads_latch.wait_for( + std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) { + ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics(); + } +} + void Daemon::start() { Status st; st = Thread::create( @@ -570,6 +577,12 @@ void Daemon::start() { &_threads.emplace_back()); } CHECK(st.ok()) << st; + + st = Thread::create( + "Daemon", "workload_group_metrics", + [this]() { this->calculate_workload_group_metrics_thread(); }, + &_threads.emplace_back()); + CHECK(st.ok()) << st; } void Daemon::stop() { diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index fe723877dcd027..bd635f5a4b1920 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -47,6 +47,7 @@ class Daemon { void cache_prune_stale_thread(); void report_runtime_query_statistics_thread(); void be_proc_monitor_thread(); + void calculate_workload_group_metrics_thread(); CountDownLatch _stop_background_threads_latch; std::vector> _threads; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 6f9e59c8291966..6814881ac7a300 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -315,7 +315,7 @@ Status PipelineTask::execute(bool* eos) { if (cpu_qs) { cpu_qs->add_cpu_nanos(delta_cpu_time); } - query_context()->update_wg_cpu_adder(delta_cpu_time); + query_context()->update_cpu_time(delta_cpu_time); }}; if (_wait_to_start()) { return Status::OK(); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 35431d54394268..621c5ebca90cad 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -241,9 +241,9 @@ class QueryContext { // only for file scan node std::map file_scan_range_params_map; - void update_wg_cpu_adder(int64_t delta_cpu_time) { + void update_cpu_time(int64_t delta_cpu_time) { if (_workload_group != nullptr) { - _workload_group->update_cpu_adder(delta_cpu_time); + _workload_group->update_cpu_time(delta_cpu_time); } } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 72e1532e58d4f1..e0a44af69c1d66 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -119,38 +119,40 @@ __VA_ARGS__; \ } while (0) -#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ - std::shared_ptr iot = nullptr; \ - auto* t_ctx = doris::thread_context(true); \ - if (t_ctx) { \ - iot = t_ctx->get_local_scan_io_throttle(data_dir); \ - } \ - if (iot) { \ - iot->acquire(-1); \ - } \ - Defer defer { \ - [&]() { \ - if (iot) { \ - iot->update_next_io_time(*bytes_read); \ - t_ctx->update_total_local_scan_io_adder(*bytes_read); \ - } \ - } \ +#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ + std::shared_ptr iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx) { \ + iot = t_ctx->get_local_scan_io_throttle(data_dir); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->update_local_scan_io(data_dir, *bytes_read); \ + } \ + } \ } -#define LIMIT_REMOTE_SCAN_IO(bytes_read) \ - std::shared_ptr iot = nullptr; \ - if (auto* t_ctx = doris::thread_context(true)) { \ - iot = t_ctx->get_remote_scan_io_throttle(); \ - } \ - if (iot) { \ - iot->acquire(-1); \ - } \ - Defer defer { \ - [&]() { \ - if (iot) { \ - iot->update_next_io_time(*bytes_read); \ - } \ - } \ +#define LIMIT_REMOTE_SCAN_IO(bytes_read) \ + std::shared_ptr iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx) { \ + iot = t_ctx->get_remote_scan_io_throttle(); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->update_remote_scan_io(*bytes_read); \ + } \ + } \ } namespace doris { @@ -282,9 +284,15 @@ class ThreadContext { return nullptr; } - void update_total_local_scan_io_adder(size_t bytes_read) { + void update_local_scan_io(std::string path, size_t bytes_read) { + if (std::shared_ptr wg_ptr = _wg_wptr.lock()) { + wg_ptr->update_local_scan_io(path, bytes_read); + } + } + + void update_remote_scan_io(size_t bytes_read) { if (std::shared_ptr wg_ptr = _wg_wptr.lock()) { - wg_ptr->update_total_local_scan_io_adder(bytes_read); + wg_ptr->update_remote_scan_io(bytes_read); } } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index f9405de12737dc..6b9388af30a7f7 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -35,6 +35,7 @@ #include "runtime/exec_env.h" #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/workload_group/workload_group_metrics.h" #include "runtime/workload_management/io_throttle.h" #include "util/mem_info.h" #include "util/parse_util.h" @@ -71,18 +72,11 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_ _need_create_query_thread_pool(need_create_query_thread_pool) { std::vector& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; for (const auto& data_dir : data_dir_list) { - _scan_io_throttle_map[data_dir.path] = - std::make_shared(_name, data_dir.bvar_name + "_read_bytes"); - } - _remote_scan_io_throttle = std::make_shared(_name, "remote_read_bytes"); - _mem_used_status = std::make_unique>(_name, "memory_used", 0); - _cpu_usage_adder = std::make_unique>(_name, "cpu_usage_adder"); - _cpu_usage_per_second = std::make_unique>>( - _name, "cpu_usage", _cpu_usage_adder.get(), 10); - _total_local_scan_io_adder = - std::make_unique>(_name, "total_local_read_bytes"); - _total_local_scan_io_per_second = std::make_unique>>( - _name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1); + _scan_io_throttle_map[data_dir.path] = std::make_shared(data_dir.bvar_name); + } + _remote_scan_io_throttle = std::make_shared(); + + _wg_metrics = std::make_shared(this); } std::string WorkloadGroup::debug_string() const { @@ -169,11 +163,11 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots( } // refresh total memory used. _total_mem_used = used_memory; + _wg_metrics->update_memory_used_bytes(used_memory); // reserve memory is recorded in the query mem tracker // and _total_mem_used already contains all the current reserve memory. // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. _wg_refresh_interval_memory_growth.store(0.0); - _mem_used_status->set_value(used_memory); return used_memory; } @@ -658,16 +652,20 @@ std::shared_ptr WorkloadGroup::get_remote_scan_io_throttle() { return _remote_scan_io_throttle; } -void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) { - (*_cpu_usage_adder) << (uint64_t)delta_cpu_time; +void WorkloadGroup::update_cpu_time(int64_t delta_cpu_time) { + _wg_metrics->update_cpu_time_nanos(delta_cpu_time); +} + +void WorkloadGroup::update_local_scan_io(std::string path, size_t scan_bytes) { + _wg_metrics->update_local_scan_io_bytes(path, (uint64_t)scan_bytes); } -void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) { - (*_total_local_scan_io_adder) << scan_bytes; +void WorkloadGroup::update_remote_scan_io(size_t scan_bytes) { + _wg_metrics->update_remote_scan_io_bytes((uint64_t)scan_bytes); } -int64_t WorkloadGroup::get_remote_scan_bytes_per_second() { - return _remote_scan_io_throttle->get_bvar_io_per_second(); +int64_t WorkloadGroup::get_mem_used() { + return _total_mem_used; } void WorkloadGroup::try_stop_schedulers() { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index fb89ed8101ad49..35a8802e4c449a 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -17,7 +17,6 @@ #pragma once -#include #include #include #include @@ -54,6 +53,8 @@ class TaskScheduler; class WorkloadGroup; struct WorkloadGroupInfo; struct TrackerLimiterGroup; +class WorkloadGroupMetrics; + class WorkloadGroup : public std::enable_shared_from_this { public: explicit WorkloadGroup(const WorkloadGroupInfo& tg_info); @@ -189,16 +190,13 @@ class WorkloadGroup : public std::enable_shared_from_this { void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info); - void update_cpu_adder(int64_t delta_cpu_time); + void update_cpu_time(int64_t delta_cpu_time); - void update_total_local_scan_io_adder(size_t scan_bytes); + void update_local_scan_io(std::string path, size_t scan_bytes); - int64_t get_mem_used() { return _mem_used_status->get_value(); } - uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); } - int64_t get_local_scan_bytes_per_second() { - return _total_local_scan_io_per_second->get_value(); - } - int64_t get_remote_scan_bytes_per_second(); + void update_remote_scan_io(size_t scan_bytes); + + int64_t get_mem_used(); ThreadPool* get_memtable_flush_pool_ptr() { // no lock here because this is called by memtable flush, @@ -209,6 +207,10 @@ class WorkloadGroup : public std::enable_shared_from_this { std::weak_ptr get_cgroup_cpu_ctl_wptr(); + std::shared_ptr get_metrics() { return _wg_metrics; } + + friend class WorkloadGroupMetrics; + private: void create_cgroup_cpu_ctl_no_lock(); void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info); @@ -260,12 +262,7 @@ class WorkloadGroup : public std::enable_shared_from_this { // for some background workload, it doesn't need to create query thread pool const bool _need_create_query_thread_pool; - // bvar metric - std::unique_ptr> _mem_used_status; - std::unique_ptr> _cpu_usage_adder; - std::unique_ptr>> _cpu_usage_per_second; - std::unique_ptr> _total_local_scan_io_adder; - std::unique_ptr>> _total_local_scan_io_per_second; + std::shared_ptr _wg_metrics {nullptr}; }; using WorkloadGroupPtr = std::shared_ptr; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index e2c4b8c7f5e6c6..1e01a7ce1bafb1 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -26,6 +26,7 @@ #include "pipeline/task_scheduler.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" +#include "runtime/workload_group/workload_group_metrics.h" #include "util/mem_info.h" #include "util/threadpool.h" #include "util/time.h" @@ -287,16 +288,25 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { for (const auto& [id, wg] : _workload_groups) { SchemaScannerHelper::insert_int64_value(0, be_id, block); SchemaScannerHelper::insert_int64_value(1, wg->id(), block); - SchemaScannerHelper::insert_int64_value(2, wg->get_mem_used(), block); + SchemaScannerHelper::insert_int64_value(2, wg->get_metrics()->get_memory_used(), block); - double cpu_usage_p = - (double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100; + double cpu_usage_p = (double)wg->get_metrics()->get_cpu_time_nanos_per_second() / + (double)total_cpu_time_ns_per_second * 100; cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0; SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block); - SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block); - SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value( + 4, wg->get_metrics()->get_local_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value( + 5, wg->get_metrics()->get_remote_scan_bytes_per_second(), block); + } +} + +void WorkloadGroupMgr::refresh_workload_group_metrics() { + std::shared_lock r_lock(_group_mutex); + for (const auto& [id, wg] : _workload_groups) { + wg->get_metrics()->refresh_metrics(); } } diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index c0eb0dfc0b237e..5d75a4558ef4f8 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -76,6 +76,8 @@ class WorkloadGroupMgr { return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID]; } + void refresh_workload_group_metrics(); + private: std::shared_mutex _group_mutex; std::unordered_map _workload_groups; diff --git a/be/src/runtime/workload_group/workload_group_metrics.cpp b/be/src/runtime/workload_group/workload_group_metrics.cpp new file mode 100644 index 00000000000000..18ff7aa2f4f185 --- /dev/null +++ b/be/src/runtime/workload_group/workload_group_metrics.cpp @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_group/workload_group_metrics.h" + +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_management/io_throttle.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" + +namespace doris { + +#include "common/compile_check_begin.h" + +WorkloadGroupMetrics::~WorkloadGroupMetrics() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_entity); +} + +WorkloadGroupMetrics::WorkloadGroupMetrics(WorkloadGroup* wg) { + _entity = DorisMetrics::instance()->metric_registry()->register_entity( + "workload_group." + wg->name(), {{"name", wg->name()}}); + + _cpu_time_metric = std::make_unique( + doris::MetricType::COUNTER, doris::MetricUnit::SECONDS, "workload_group_cpu_time_sec"); + _cpu_time_counter = + (IntAtomicCounter*)(_entity->register_metric(_cpu_time_metric.get())); + + _mem_used_bytes_metric = std::make_unique( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_mem_used_bytes"); + _mem_used_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( + _mem_used_bytes_metric.get())); + + _local_scan_bytes_metric = std::make_unique( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, + "workload_group_local_scan_bytes"); + _local_scan_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( + _local_scan_bytes_metric.get())); + + _remote_scan_bytes_metric = std::make_unique( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, + "workload_group_remote_scan_bytes"); + _remote_scan_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( + _remote_scan_bytes_metric.get())); + + for (const auto& [key, io_throttle] : wg->_scan_io_throttle_map) { + std::unique_ptr metric = std::make_unique( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, + "workload_group_local_scan_bytes_" + io_throttle->metric_name()); + _local_scan_bytes_counter_map[key] = + (IntAtomicCounter*)(_entity->register_metric(metric.get())); + _local_scan_bytes_metric_map[key] = std::move(metric); + } +} + +void WorkloadGroupMetrics::update_cpu_time_nanos(uint64_t delta_cpu_time) { + _cpu_time_nanos += delta_cpu_time; +} + +void WorkloadGroupMetrics::update_memory_used_bytes(int64_t memory_used) { + _memory_used = memory_used; +} + +void WorkloadGroupMetrics::update_local_scan_io_bytes(std::string path, uint64_t delta_io_bytes) { + _local_scan_bytes_counter->increment(delta_io_bytes); + _local_scan_bytes_counter_map[path]->increment((int64_t)delta_io_bytes); +} + +void WorkloadGroupMetrics::update_remote_scan_io_bytes(uint64_t delta_io_bytes) { + _remote_scan_bytes_counter->increment(delta_io_bytes); +} + +void WorkloadGroupMetrics::refresh_metrics() { + int interval_second = config::workload_group_metrics_interval_ms / 1000; + + // cpu + uint64_t _current_cpu_time_nanos = _cpu_time_nanos.load(); + uint64_t _cpu_time_sec = _current_cpu_time_nanos / (1000L * 1000L * 1000L); + _cpu_time_counter->set_value(_cpu_time_sec); + _per_sec_cpu_time_nanos = (_current_cpu_time_nanos - _last_cpu_time_nanos) / interval_second; + _last_cpu_time_nanos = _current_cpu_time_nanos; + + // memory + _mem_used_bytes_counter->set_value(_memory_used); + + // local scan + int64_t current_local_scan_bytes = _local_scan_bytes_counter->value(); + _per_sec_local_scan_bytes = + (current_local_scan_bytes - _last_local_scan_bytes) / interval_second; + _last_local_scan_bytes = current_local_scan_bytes; + + // remote scan + int64_t current_remote_scan_bytes = _remote_scan_bytes_counter->value(); + _per_sec_remote_scan_bytes = + (current_remote_scan_bytes - _last_remote_scan_bytes) / interval_second; + _last_remote_scan_bytes = current_remote_scan_bytes; +} + +uint64_t WorkloadGroupMetrics::get_cpu_time_nanos_per_second() { + return _per_sec_cpu_time_nanos.load(); +} + +int64_t WorkloadGroupMetrics::get_local_scan_bytes_per_second() { + return _per_sec_local_scan_bytes.load(); +} + +int64_t WorkloadGroupMetrics::get_remote_scan_bytes_per_second() { + return _last_remote_scan_bytes.load(); +} + +int64_t WorkloadGroupMetrics::get_memory_used() { + return _mem_used_bytes_counter->value(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_group/workload_group_metrics.h b/be/src/runtime/workload_group/workload_group_metrics.h new file mode 100644 index 00000000000000..e68715df249dee --- /dev/null +++ b/be/src/runtime/workload_group/workload_group_metrics.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +namespace doris { + +class WorkloadGroup; + +template +class AtomicCounter; +using IntAtomicCounter = AtomicCounter; +class MetricEntity; +struct MetricPrototype; + +class WorkloadGroupMetrics { +public: + WorkloadGroupMetrics(WorkloadGroup* wg); + + ~WorkloadGroupMetrics(); + + void update_cpu_time_nanos(uint64_t delta_cpu_time); + + void update_memory_used_bytes(int64_t memory_used); + + void update_local_scan_io_bytes(std::string path, uint64_t delta_io_bytes); + + void update_remote_scan_io_bytes(uint64_t delta_io_bytes); + + void refresh_metrics(); + + uint64_t get_cpu_time_nanos_per_second(); + + int64_t get_local_scan_bytes_per_second(); + + int64_t get_remote_scan_bytes_per_second(); + + int64_t get_memory_used(); + +private: + std::unique_ptr _cpu_time_metric {nullptr}; + std::unique_ptr _mem_used_bytes_metric {nullptr}; + std::unique_ptr _local_scan_bytes_metric {nullptr}; + std::unique_ptr _remote_scan_bytes_metric {nullptr}; + // NOTE: _local_scan_bytes_metric is sum of all disk's IO + // _local_disk_io_metric is every disk's IO + std::map> _local_scan_bytes_metric_map; + + IntAtomicCounter* _cpu_time_counter {nullptr}; // used for metric + IntAtomicCounter* _mem_used_bytes_counter {nullptr}; // used for metric + IntAtomicCounter* _local_scan_bytes_counter {nullptr}; // used for metric + IntAtomicCounter* _remote_scan_bytes_counter {nullptr}; // used for metric + std::map _local_scan_bytes_counter_map; // used for metric + + std::atomic _cpu_time_nanos {0}; + std::atomic _last_cpu_time_nanos {0}; + std::atomic _per_sec_cpu_time_nanos {0}; // used for system table + + std::atomic _per_sec_local_scan_bytes {0}; + std::atomic _last_local_scan_bytes {0}; // used for system table + + std::atomic _per_sec_remote_scan_bytes {0}; + std::atomic _last_remote_scan_bytes {0}; // used for system table + + std::atomic _memory_used {0}; + + std::shared_ptr _entity {nullptr}; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/io_throttle.cpp b/be/src/runtime/workload_management/io_throttle.cpp index dacfa29012f59f..118fc518072272 100644 --- a/be/src/runtime/workload_management/io_throttle.cpp +++ b/be/src/runtime/workload_management/io_throttle.cpp @@ -22,12 +22,6 @@ namespace doris { -IOThrottle::IOThrottle(std::string prefix, std::string name) { - _io_adder = std::make_unique>(prefix, name); - _io_adder_per_second = std::make_unique>>( - prefix, name + "_per_second", _io_adder.get(), 1); -} - bool IOThrottle::acquire(int64_t block_timeout_ms) { if (_io_bytes_per_second_limit < 0) { return true; @@ -57,11 +51,6 @@ bool IOThrottle::try_acquire() { } void IOThrottle::update_next_io_time(int64_t io_bytes) { - Defer defer {[&]() { - if (io_bytes > 0) { - (*_io_adder) << io_bytes; - } - }}; if (_io_bytes_per_second_limit <= 0 || io_bytes <= 0) { return; } diff --git a/be/src/runtime/workload_management/io_throttle.h b/be/src/runtime/workload_management/io_throttle.h index 4212527020e0e2..f688922fcd29f9 100644 --- a/be/src/runtime/workload_management/io_throttle.h +++ b/be/src/runtime/workload_management/io_throttle.h @@ -28,7 +28,9 @@ namespace doris { class IOThrottle { public: - IOThrottle(std::string prefix, std::string name); + IOThrottle() = default; + + IOThrottle(std::string metric_name) : _metric_name(metric_name) {} ~IOThrottle() = default; @@ -41,7 +43,7 @@ class IOThrottle { void set_io_bytes_per_second(int64_t read_bytes_per_second); - size_t get_bvar_io_per_second() { return _io_adder_per_second->get_value(); } + std::string metric_name() { return _metric_name; } private: std::mutex _mutex; @@ -49,8 +51,6 @@ class IOThrottle { int64_t _next_io_time_micros {0}; std::atomic _io_bytes_per_second_limit {-1}; - // bvar monitor - std::unique_ptr> _io_adder; - std::unique_ptr>> _io_adder_per_second; + std::string _metric_name; }; }; // namespace doris \ No newline at end of file diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index c1385b6244bf62..973f461d8defe7 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -38,6 +38,16 @@ namespace doris { +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(host_cpu_num, MetricUnit::NOUNIT); +struct CpuNumberMetrics { + CpuNumberMetrics(MetricEntity* ent) : entity(ent) { + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, host_cpu_num); + } + + IntAtomicCounter* host_cpu_num {nullptr}; + MetricEntity* entity = nullptr; +}; + #define DEFINE_CPU_COUNTER_METRIC(metric) \ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cpu_##metric, MetricUnit::PERCENT, "", cpu, \ Labels({{"mode", #metric}})); @@ -386,11 +396,22 @@ void SystemMetrics::update() { void SystemMetrics::_install_cpu_metrics() { get_cpu_name(); + + int cpu_num = 0; for (auto cpu_name : _cpu_names) { + // NOTE: cpu_name comes from /proc/stat which named 'cpu' is not a real cpu name, it should be skipped. + if (cpu_name != "cpu") { + cpu_num++; + } auto cpu_entity = _registry->register_entity(cpu_name, {{"device", cpu_name}}); CpuMetrics* metrics = new CpuMetrics(cpu_entity.get()); _cpu_metrics.emplace(cpu_name, metrics); } + + auto cpu_num_entity = _registry->register_entity("doris_be_host_cpu_num"); + _cpu_num_metrics = std::make_unique(cpu_num_entity.get()); + + _cpu_num_metrics->host_cpu_num->set_value(cpu_num); } #ifdef BE_TEST diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index c72ba3693012fb..29ce8c9c02b359 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -31,6 +31,7 @@ namespace doris { struct CpuMetrics; +struct CpuNumberMetrics; struct MemoryMetrics; struct DiskMetrics; struct NetworkMetrics; @@ -99,6 +100,7 @@ class SystemMetrics { static const char* _s_hook_name; std::map _cpu_metrics; + std::unique_ptr _cpu_num_metrics; std::unique_ptr _memory_metrics; std::map _disk_metrics; std::map _network_metrics; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 97bf563db1fa58..0087a19d92f54a 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -262,7 +262,7 @@ void VScanner::update_scan_cpu_timer() { _scan_cpu_timer += cpu_time; _query_statistics->add_cpu_nanos(cpu_time); if (_state && _state->get_query_ctx()) { - _state->get_query_ctx()->update_wg_cpu_adder(cpu_time); + _state->get_query_ctx()->update_cpu_time(cpu_time); } } diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 65210a53ec3b55..ed4f71677f2afe 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -127,7 +127,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi cpu_time_stop_watch.start(); Defer defer {[&]() { if (state && state->get_query_ctx()) { - state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time()); + state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time()); } }}; if (!_eos && _data_queue.empty() && _writer_status.ok()) {