Skip to content

Commit

Permalink
added several new metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
troy4eg committed Aug 25, 2023
1 parent 4714d83 commit 9823243
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 25 deletions.
17 changes: 12 additions & 5 deletions server/server-stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ void ServerStats::after_fork(pid_t worker_pid, uint64_t active_connections, uint
worker_type_ = worker_type;
gen_->seed(worker_pid);
shared_stats_->workers.reset_worker_stats(worker_pid, active_connections, max_connections, worker_process_id_);
last_update_ = std::chrono::steady_clock::now();
last_update_aggr_stats = std::chrono::steady_clock::now();
last_update_statshouse = std::chrono::steady_clock::now();
}

void ServerStats::add_request_stats(double script_time_sec, double net_time_sec, int64_t script_queries, int64_t long_script_queries, int64_t memory_used,
Expand Down Expand Up @@ -642,9 +643,15 @@ void ServerStats::add_job_common_memory_stats(int64_t common_request_memory_used

void ServerStats::update_this_worker_stats() noexcept {
const auto now_tp = std::chrono::steady_clock::now();
if (now_tp - last_update_ >= std::chrono::seconds{5}) {
if (now_tp - last_update_aggr_stats >= std::chrono::seconds{5}) {
shared_stats_->workers.update_worker_stats(worker_process_id_);
last_update_ = now_tp;
last_update_aggr_stats = now_tp;
}

if (now_tp - last_update_statshouse >= std::chrono::seconds{1}) {
auto virtual_memory_stat = get_self_mem_stats();
StatsHouseClient::get().add_worker_memory_stats(worker_type_, virtual_memory_stat);
last_update_statshouse = now_tp;
}
}

Expand All @@ -666,11 +673,11 @@ void ServerStats::set_running_worker_status() noexcept {

void ServerStats::aggregate_stats() noexcept {
const auto now_tp = std::chrono::steady_clock::now();
if (now_tp - last_update_ < std::chrono::seconds{5}) {
if (now_tp - last_update_aggr_stats < std::chrono::seconds{5}) {
return;
}

last_update_ = now_tp;
last_update_aggr_stats = now_tp;
const auto &workers_control = vk::singleton<WorkersControl>::get();

const uint16_t general_workers = workers_control.get_count(WorkerType::general_worker);
Expand Down
3 changes: 2 additions & 1 deletion server/server-stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class ServerStats : vk::not_copyable {

WorkerType worker_type_{WorkerType::general_worker};
uint16_t worker_process_id_{0};
std::chrono::steady_clock::time_point last_update_;
std::chrono::steady_clock::time_point last_update_aggr_stats;
std::chrono::steady_clock::time_point last_update_statshouse;

std::mt19937 *gen_{nullptr};

Expand Down
122 changes: 103 additions & 19 deletions server/statshouse/statshouse-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@

#include "common/precise-time.h"
#include "runtime/instance-cache.h"
#include "server/job-workers/job-stats.h"
#include "server/job-workers/shared-memory-manager.h"
#include "server/json-logger.h"
#include "server/server-config.h"
#include "server/server-stats.h"
#include "server/workers-stats.h"

StatsHouseClient *StatsHouseClient::inner = nullptr;

template<typename T>
T unpack(const std::atomic<T> &value) {
return value.load(std::memory_order_relaxed);
}

inline size_t get_memory_used(size_t acquired, size_t released, size_t buffer_size) {
return acquired > released ? (acquired - released) * buffer_size : 0;
}

StatsHouseClient::StatsHouseClient(const std::string &ip, int port)
: transport(ip, port){};

Expand All @@ -26,7 +33,7 @@ void StatsHouseClient::add_request_stats(WorkerType raw_worker_type, uint64_t sc
transport.metric("kphp_request_time").tag(cluster_name).tag("net").tag(worker_type).write_value(net_time_ns);

transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("used").tag(worker_type).write_value(memory_used);
transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("free").tag(worker_type).write_value(real_memory_used);
transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("real_used").tag(worker_type).write_value(real_memory_used);

transport.metric("kphp_requests_outgoing_queries").tag(cluster_name).tag(worker_type).write_value(script_queries);
transport.metric("kphp_requests_outgoing_long_queries").tag(cluster_name).tag(worker_type).write_value(long_script_queries);
Expand All @@ -50,9 +57,18 @@ void StatsHouseClient::add_job_common_memory_stats(uint64_t job_common_request_m
transport.metric("kphp_job_common_request_memory").tag(cluster_name).tag("real_used").write_value(job_common_request_real_memory_used);
}

void StatsHouseClient::add_common_master_stats(const workers_stats_t &workers_stats, const memory_resource::MemoryStats &memory_stats,
double cpu_s_usage, double cpu_u_usage,
long long int instance_cache_memory_swaps_ok, long long int instance_cache_memory_swaps_fail) {
void StatsHouseClient::add_worker_memory_stats(WorkerType raw_worker_type, const mem_info_t &mem_stats) {
const char *cluster_name = vk::singleton<ServerConfig>::get().get_cluster_name();
const char *worker_type = raw_worker_type == WorkerType::general_worker ? "general" : "job";
transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("vm_peak").write_value(mem_stats.vm_peak);
transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("vm").write_value(mem_stats.vm);
transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("rss").write_value(mem_stats.rss);
transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("rss_peak").write_value(mem_stats.rss_peak);
}

void StatsHouseClient::add_common_master_stats(const workers_stats_t &workers_stats, const memory_resource::MemoryStats &memory_stats, double cpu_s_usage,
double cpu_u_usage, long long int instance_cache_memory_swaps_ok,
long long int instance_cache_memory_swaps_fail) {
const char *cluster_name = vk::singleton<ServerConfig>::get().get_cluster_name();
if (engine_tag) {
transport.metric("kphp_version").tag(cluster_name).write_value(atoll(engine_tag));
Expand Down Expand Up @@ -98,32 +114,100 @@ void StatsHouseClient::add_common_master_stats(const workers_stats_t &workers_st
transport.metric("kphp_instance_cache_memory_buffer_swaps").tag(cluster_name).tag("fail").write_value(instance_cache_memory_swaps_fail);

const auto &instance_cache_element_stats = instance_cache_get_stats();
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("stored").write_value(instance_cache_element_stats.elements_stored);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("stored").write_value(unpack(instance_cache_element_stats.elements_stored));
transport.metric("kphp_instance_cache_elements")
.tag(cluster_name)
.tag("stored_with_delay")
.write_value(instance_cache_element_stats.elements_stored_with_delay);
.write_value(unpack(instance_cache_element_stats.elements_stored_with_delay));
transport.metric("kphp_instance_cache_elements")
.tag(cluster_name)
.tag("storing_skipped_due_recent_update")
.write_value(instance_cache_element_stats.elements_storing_skipped_due_recent_update);
.write_value(unpack(instance_cache_element_stats.elements_storing_skipped_due_recent_update));
transport.metric("kphp_instance_cache_elements")
.tag(cluster_name)
.tag("storing_delayed_due_mutex")
.write_value(instance_cache_element_stats.elements_storing_delayed_due_mutex);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("fetched").write_value(instance_cache_element_stats.elements_fetched);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("missed").write_value(instance_cache_element_stats.elements_missed);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("missed_earlier").write_value(instance_cache_element_stats.elements_missed_earlier);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("expired").write_value(instance_cache_element_stats.elements_expired);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("created").write_value(instance_cache_element_stats.elements_created);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("destroyed").write_value(instance_cache_element_stats.elements_destroyed);
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("cached").write_value(instance_cache_element_stats.elements_cached);
.write_value(unpack(instance_cache_element_stats.elements_storing_delayed_due_mutex));
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("fetched").write_value(unpack(instance_cache_element_stats.elements_fetched));
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("missed").write_value(unpack(instance_cache_element_stats.elements_missed));
transport.metric("kphp_instance_cache_elements")
.tag(cluster_name)
.tag("missed_earlier")
.write_value(unpack(instance_cache_element_stats.elements_missed_earlier));
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("expired").write_value(unpack(instance_cache_element_stats.elements_expired));
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("created").write_value(unpack(instance_cache_element_stats.elements_created));
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("destroyed").write_value(unpack(instance_cache_element_stats.elements_destroyed));
transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("cached").write_value(unpack(instance_cache_element_stats.elements_cached));
transport.metric("kphp_instance_cache_elements")
.tag(cluster_name)
.tag("logically_expired_and_ignored")
.write_value(instance_cache_element_stats.elements_logically_expired_and_ignored);
.write_value(unpack(instance_cache_element_stats.elements_logically_expired_and_ignored));
transport.metric("kphp_instance_cache_elements")
.tag(cluster_name)
.tag("logically_expired_but_fetched")
.write_value(instance_cache_element_stats.elements_logically_expired_but_fetched);
.write_value(unpack(instance_cache_element_stats.elements_logically_expired_but_fetched));

using namespace job_workers;
const JobStats &job_stats = vk::singleton<SharedMemoryManager>::get().get_stats();
transport.metric("kphp_workers_jobs_queue_size").tag(cluster_name).write_value(unpack(job_stats.job_queue_size));

transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("reserved").write_value(job_stats.messages.count);
transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("acquire_fails").write_value(unpack(job_stats.messages.acquire_fails));
transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("acquire").write_value(unpack(job_stats.messages.acquired));
transport.metric("kphp_workers_job_messages").tag(cluster_name).tag("released").write_value(unpack(job_stats.messages.released));

this->add_job_workers_shared_memory_stats(cluster_name, job_stats);
}

void StatsHouseClient::add_job_workers_shared_memory_stats(const char *cluster_name, const job_workers::JobStats &job_stats) {
using namespace job_workers;

size_t total_used = this->add_job_workers_shared_messages_stats(cluster_name, job_stats.messages, JOB_SHARED_MESSAGE_BYTES);

constexpr std::array<const char *, JOB_EXTRA_MEMORY_BUFFER_BUCKETS> extra_memory_prefixes{
"256kb", "512kb", "1mb", "2mb", "4mb", "8mb", "16mb", "32mb", "64mb",
};
for (size_t i = 0; i != JOB_EXTRA_MEMORY_BUFFER_BUCKETS; ++i) {
const size_t buffer_size = get_extra_shared_memory_buffer_size(i);
total_used += this->add_job_workers_shared_memory_buffers_stats(cluster_name, job_stats.extra_memory[i], extra_memory_prefixes[i], buffer_size);
}

transport.metric("kphp_job_workers_shared_memory").tag(cluster_name).tag("limit").write_value(job_stats.memory_limit);
transport.metric("kphp_job_workers_shared_memory").tag(cluster_name).tag("used").write_value(total_used);
}

size_t StatsHouseClient::add_job_workers_shared_messages_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats,
size_t buffer_size) {
using namespace job_workers;

const size_t acquired_buffers = unpack(memory_buffers_stats.acquired);
const size_t released_buffers = unpack(memory_buffers_stats.released);
const size_t memory_used = get_memory_used(acquired_buffers, released_buffers, buffer_size);

transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("reserved").write_value(memory_buffers_stats.count);
transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("acquire_fails").write_value(unpack(memory_buffers_stats.acquire_fails));
transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("acquired").write_value(acquired_buffers);
transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("released").write_value(released_buffers);

return memory_used;
}

size_t StatsHouseClient::add_job_workers_shared_memory_buffers_stats(const char *cluster_name,
const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, const char *size_tag,
size_t buffer_size) {
using namespace job_workers;

const size_t acquired_buffers = unpack(memory_buffers_stats.acquired);
const size_t released_buffers = unpack(memory_buffers_stats.released);
const size_t memory_used = get_memory_used(acquired_buffers, released_buffers, buffer_size);

transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("reserved").write_value(memory_buffers_stats.count);
transport.metric("kphp_job_workers_shared_extra_buffers")
.tag(cluster_name)
.tag(size_tag)
.tag("acquire_fails")
.write_value(unpack(memory_buffers_stats.acquire_fails));
transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("acquired").write_value(acquired_buffers);
transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("released").write_value(released_buffers);

return memory_used;
}
12 changes: 12 additions & 0 deletions server/statshouse/statshouse-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

#include <cassert>

#include "common/dl-utils-lite.h"
#include "common/mixin/not_copyable.h"
#include "runtime/memory_resource/memory_resource.h"
#include "server/job-workers/job-stats.h"
#include "server/workers-control.h"
#include "server/workers-stats.h"

Expand Down Expand Up @@ -37,6 +39,8 @@ class StatsHouseClient : vk::not_copyable {

void add_job_common_memory_stats(uint64_t job_common_request_memory_used, uint64_t job_common_request_real_memory_used);

void add_worker_memory_stats(WorkerType raw_worker_type, const mem_info_t &mem_stats);

/**
* Must be called from master process only
*/
Expand All @@ -46,6 +50,14 @@ class StatsHouseClient : vk::not_copyable {
private:
explicit StatsHouseClient(const std::string &ip, int port);

void add_job_workers_shared_memory_stats(const char *cluster_name, const job_workers::JobStats &job_stats);

size_t add_job_workers_shared_messages_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats,
size_t buffer_size);

size_t add_job_workers_shared_memory_buffers_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats,
const char *size_tag, size_t buffer_size);

static StatsHouseClient *inner;
statshouse::TransportUDP transport;
};

0 comments on commit 9823243

Please sign in to comment.