From 9823243dde893e0dd8510bfc7601a4f021eb8a3a Mon Sep 17 00:00:00 2001 From: Vladislav Senin Date: Sat, 26 Aug 2023 01:05:19 +0300 Subject: [PATCH] added several new metrics --- server/server-stats.cpp | 17 +++- server/server-stats.h | 3 +- server/statshouse/statshouse-client.cpp | 122 ++++++++++++++++++++---- server/statshouse/statshouse-client.h | 12 +++ 4 files changed, 129 insertions(+), 25 deletions(-) diff --git a/server/server-stats.cpp b/server/server-stats.cpp index 7800fdf011..188a4d211f 100644 --- a/server/server-stats.cpp +++ b/server/server-stats.cpp @@ -603,7 +603,8 @@ void ServerStats::after_fork(pid_t worker_pid, uint64_t active_connections, uint worker_type_ = worker_type; gen_->seed(worker_pid); shared_stats_->workers.reset_worker_stats(worker_pid, active_connections, max_connections, worker_process_id_); - last_update_ = std::chrono::steady_clock::now(); + last_update_aggr_stats = std::chrono::steady_clock::now(); + last_update_statshouse = std::chrono::steady_clock::now(); } void ServerStats::add_request_stats(double script_time_sec, double net_time_sec, int64_t script_queries, int64_t long_script_queries, int64_t memory_used, @@ -642,9 +643,15 @@ void ServerStats::add_job_common_memory_stats(int64_t common_request_memory_used void ServerStats::update_this_worker_stats() noexcept { const auto now_tp = std::chrono::steady_clock::now(); - if (now_tp - last_update_ >= std::chrono::seconds{5}) { + if (now_tp - last_update_aggr_stats >= std::chrono::seconds{5}) { shared_stats_->workers.update_worker_stats(worker_process_id_); - last_update_ = now_tp; + last_update_aggr_stats = now_tp; + } + + if (now_tp - last_update_statshouse >= std::chrono::seconds{1}) { + auto virtual_memory_stat = get_self_mem_stats(); + StatsHouseClient::get().add_worker_memory_stats(worker_type_, virtual_memory_stat); + last_update_statshouse = now_tp; } } @@ -666,11 +673,11 @@ void ServerStats::set_running_worker_status() noexcept { void ServerStats::aggregate_stats() noexcept { const auto now_tp = std::chrono::steady_clock::now(); - if (now_tp - last_update_ < std::chrono::seconds{5}) { + if (now_tp - last_update_aggr_stats < std::chrono::seconds{5}) { return; } - last_update_ = now_tp; + last_update_aggr_stats = now_tp; const auto &workers_control = vk::singleton::get(); const uint16_t general_workers = workers_control.get_count(WorkerType::general_worker); diff --git a/server/server-stats.h b/server/server-stats.h index c38c2dbc2d..9c5198c367 100644 --- a/server/server-stats.h +++ b/server/server-stats.h @@ -59,7 +59,8 @@ class ServerStats : vk::not_copyable { WorkerType worker_type_{WorkerType::general_worker}; uint16_t worker_process_id_{0}; - std::chrono::steady_clock::time_point last_update_; + std::chrono::steady_clock::time_point last_update_aggr_stats; + std::chrono::steady_clock::time_point last_update_statshouse; std::mt19937 *gen_{nullptr}; diff --git a/server/statshouse/statshouse-client.cpp b/server/statshouse/statshouse-client.cpp index aa7e771353..41f8d623c2 100644 --- a/server/statshouse/statshouse-client.cpp +++ b/server/statshouse/statshouse-client.cpp @@ -6,15 +6,22 @@ #include "common/precise-time.h" #include "runtime/instance-cache.h" -#include "server/job-workers/job-stats.h" #include "server/job-workers/shared-memory-manager.h" #include "server/json-logger.h" #include "server/server-config.h" #include "server/server-stats.h" -#include "server/workers-stats.h" StatsHouseClient *StatsHouseClient::inner = nullptr; +template +T unpack(const std::atomic &value) { + return value.load(std::memory_order_relaxed); +} + +inline size_t get_memory_used(size_t acquired, size_t released, size_t buffer_size) { + return acquired > released ? (acquired - released) * buffer_size : 0; +} + StatsHouseClient::StatsHouseClient(const std::string &ip, int port) : transport(ip, port){}; @@ -26,7 +33,7 @@ void StatsHouseClient::add_request_stats(WorkerType raw_worker_type, uint64_t sc transport.metric("kphp_request_time").tag(cluster_name).tag("net").tag(worker_type).write_value(net_time_ns); transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("used").tag(worker_type).write_value(memory_used); - transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("free").tag(worker_type).write_value(real_memory_used); + transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("real_used").tag(worker_type).write_value(real_memory_used); transport.metric("kphp_requests_outgoing_queries").tag(cluster_name).tag(worker_type).write_value(script_queries); transport.metric("kphp_requests_outgoing_long_queries").tag(cluster_name).tag(worker_type).write_value(long_script_queries); @@ -50,9 +57,18 @@ void StatsHouseClient::add_job_common_memory_stats(uint64_t job_common_request_m transport.metric("kphp_job_common_request_memory").tag(cluster_name).tag("real_used").write_value(job_common_request_real_memory_used); } -void StatsHouseClient::add_common_master_stats(const workers_stats_t &workers_stats, const memory_resource::MemoryStats &memory_stats, - double cpu_s_usage, double cpu_u_usage, - long long int instance_cache_memory_swaps_ok, long long int instance_cache_memory_swaps_fail) { +void StatsHouseClient::add_worker_memory_stats(WorkerType raw_worker_type, const mem_info_t &mem_stats) { + const char *cluster_name = vk::singleton::get().get_cluster_name(); + const char *worker_type = raw_worker_type == WorkerType::general_worker ? "general" : "job"; + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("vm_peak").write_value(mem_stats.vm_peak); + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("vm").write_value(mem_stats.vm); + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("rss").write_value(mem_stats.rss); + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("rss_peak").write_value(mem_stats.rss_peak); +} + +void StatsHouseClient::add_common_master_stats(const workers_stats_t &workers_stats, const memory_resource::MemoryStats &memory_stats, double cpu_s_usage, + double cpu_u_usage, long long int instance_cache_memory_swaps_ok, + long long int instance_cache_memory_swaps_fail) { const char *cluster_name = vk::singleton::get().get_cluster_name(); if (engine_tag) { transport.metric("kphp_version").tag(cluster_name).write_value(atoll(engine_tag)); @@ -98,32 +114,100 @@ void StatsHouseClient::add_common_master_stats(const workers_stats_t &workers_st transport.metric("kphp_instance_cache_memory_buffer_swaps").tag(cluster_name).tag("fail").write_value(instance_cache_memory_swaps_fail); const auto &instance_cache_element_stats = instance_cache_get_stats(); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("stored").write_value(instance_cache_element_stats.elements_stored); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("stored").write_value(unpack(instance_cache_element_stats.elements_stored)); transport.metric("kphp_instance_cache_elements") .tag(cluster_name) .tag("stored_with_delay") - .write_value(instance_cache_element_stats.elements_stored_with_delay); + .write_value(unpack(instance_cache_element_stats.elements_stored_with_delay)); transport.metric("kphp_instance_cache_elements") .tag(cluster_name) .tag("storing_skipped_due_recent_update") - .write_value(instance_cache_element_stats.elements_storing_skipped_due_recent_update); + .write_value(unpack(instance_cache_element_stats.elements_storing_skipped_due_recent_update)); transport.metric("kphp_instance_cache_elements") .tag(cluster_name) .tag("storing_delayed_due_mutex") - .write_value(instance_cache_element_stats.elements_storing_delayed_due_mutex); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("fetched").write_value(instance_cache_element_stats.elements_fetched); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("missed").write_value(instance_cache_element_stats.elements_missed); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("missed_earlier").write_value(instance_cache_element_stats.elements_missed_earlier); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("expired").write_value(instance_cache_element_stats.elements_expired); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("created").write_value(instance_cache_element_stats.elements_created); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("destroyed").write_value(instance_cache_element_stats.elements_destroyed); - transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("cached").write_value(instance_cache_element_stats.elements_cached); + .write_value(unpack(instance_cache_element_stats.elements_storing_delayed_due_mutex)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("fetched").write_value(unpack(instance_cache_element_stats.elements_fetched)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("missed").write_value(unpack(instance_cache_element_stats.elements_missed)); + transport.metric("kphp_instance_cache_elements") + .tag(cluster_name) + .tag("missed_earlier") + .write_value(unpack(instance_cache_element_stats.elements_missed_earlier)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("expired").write_value(unpack(instance_cache_element_stats.elements_expired)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("created").write_value(unpack(instance_cache_element_stats.elements_created)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("destroyed").write_value(unpack(instance_cache_element_stats.elements_destroyed)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("cached").write_value(unpack(instance_cache_element_stats.elements_cached)); transport.metric("kphp_instance_cache_elements") .tag(cluster_name) .tag("logically_expired_and_ignored") - .write_value(instance_cache_element_stats.elements_logically_expired_and_ignored); + .write_value(unpack(instance_cache_element_stats.elements_logically_expired_and_ignored)); transport.metric("kphp_instance_cache_elements") .tag(cluster_name) .tag("logically_expired_but_fetched") - .write_value(instance_cache_element_stats.elements_logically_expired_but_fetched); + .write_value(unpack(instance_cache_element_stats.elements_logically_expired_but_fetched)); + + using namespace job_workers; + const JobStats &job_stats = vk::singleton::get().get_stats(); + transport.metric("kphp_workers_jobs_queue_size").tag(cluster_name).write_value(unpack(job_stats.job_queue_size)); + + transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("reserved").write_value(job_stats.messages.count); + transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("acquire_fails").write_value(unpack(job_stats.messages.acquire_fails)); + transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("acquire").write_value(unpack(job_stats.messages.acquired)); + transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("released").write_value(unpack(job_stats.messages.released)); + + this->add_job_workers_shared_memory_stats(cluster_name, job_stats); +} + +void StatsHouseClient::add_job_workers_shared_memory_stats(const char *cluster_name, const job_workers::JobStats &job_stats) { + using namespace job_workers; + + size_t total_used = this->add_job_workers_shared_messages_stats(cluster_name, job_stats.messages, JOB_SHARED_MESSAGE_BYTES); + + constexpr std::array extra_memory_prefixes{ + "256kb", "512kb", "1mb", "2mb", "4mb", "8mb", "16mb", "32mb", "64mb", + }; + for (size_t i = 0; i != JOB_EXTRA_MEMORY_BUFFER_BUCKETS; ++i) { + const size_t buffer_size = get_extra_shared_memory_buffer_size(i); + total_used += this->add_job_workers_shared_memory_buffers_stats(cluster_name, job_stats.extra_memory[i], extra_memory_prefixes[i], buffer_size); + } + + transport.metric("kphp_job_workers_shared_memory").tag(cluster_name).tag("limit").write_value(job_stats.memory_limit); + transport.metric("kphp_job_workers_shared_memory").tag(cluster_name).tag("used").write_value(total_used); +} + +size_t StatsHouseClient::add_job_workers_shared_messages_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, + size_t buffer_size) { + using namespace job_workers; + + const size_t acquired_buffers = unpack(memory_buffers_stats.acquired); + const size_t released_buffers = unpack(memory_buffers_stats.released); + const size_t memory_used = get_memory_used(acquired_buffers, released_buffers, buffer_size); + + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("reserved").write_value(memory_buffers_stats.count); + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("acquire_fails").write_value(unpack(memory_buffers_stats.acquire_fails)); + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("acquired").write_value(acquired_buffers); + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("released").write_value(released_buffers); + + return memory_used; +} + +size_t StatsHouseClient::add_job_workers_shared_memory_buffers_stats(const char *cluster_name, + const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, const char *size_tag, + size_t buffer_size) { + using namespace job_workers; + + const size_t acquired_buffers = unpack(memory_buffers_stats.acquired); + const size_t released_buffers = unpack(memory_buffers_stats.released); + const size_t memory_used = get_memory_used(acquired_buffers, released_buffers, buffer_size); + + transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("reserved").write_value(memory_buffers_stats.count); + transport.metric("kphp_job_workers_shared_extra_buffers") + .tag(cluster_name) + .tag(size_tag) + .tag("acquire_fails") + .write_value(unpack(memory_buffers_stats.acquire_fails)); + transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("acquired").write_value(acquired_buffers); + transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("released").write_value(released_buffers); + + return memory_used; } diff --git a/server/statshouse/statshouse-client.h b/server/statshouse/statshouse-client.h index e418419565..8266fb4b9d 100644 --- a/server/statshouse/statshouse-client.h +++ b/server/statshouse/statshouse-client.h @@ -8,8 +8,10 @@ #include +#include "common/dl-utils-lite.h" #include "common/mixin/not_copyable.h" #include "runtime/memory_resource/memory_resource.h" +#include "server/job-workers/job-stats.h" #include "server/workers-control.h" #include "server/workers-stats.h" @@ -37,6 +39,8 @@ class StatsHouseClient : vk::not_copyable { void add_job_common_memory_stats(uint64_t job_common_request_memory_used, uint64_t job_common_request_real_memory_used); + void add_worker_memory_stats(WorkerType raw_worker_type, const mem_info_t &mem_stats); + /** * Must be called from master process only */ @@ -46,6 +50,14 @@ class StatsHouseClient : vk::not_copyable { private: explicit StatsHouseClient(const std::string &ip, int port); + void add_job_workers_shared_memory_stats(const char *cluster_name, const job_workers::JobStats &job_stats); + + size_t add_job_workers_shared_messages_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, + size_t buffer_size); + + size_t add_job_workers_shared_memory_buffers_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, + const char *size_tag, size_t buffer_size); + static StatsHouseClient *inner; statshouse::TransportUDP transport; };