diff --git a/common/server/statsd-client.cpp b/common/server/statsd-client.cpp index e2cd899b9f..c40622e0ed 100644 --- a/common/server/statsd-client.cpp +++ b/common/server/statsd-client.cpp @@ -43,10 +43,6 @@ class statsd_stats_t : public stats_t { // ignore it } - bool need_aggregated_stats() noexcept final { - return true; - } - protected: void add_stat(char type, const char *key, double value) noexcept final { sb_printf(&sb, "%s.%s:", stats_prefix, normalize_key(key, "%s", "")); @@ -71,10 +67,6 @@ class statsd_stats_t : public stats_t { sb_printf(&sb, "%lld", value); sb_printf(&sb, "|%c\n", type); }; - - void add_multiple_stats(const char *key [[maybe_unused]], std::vector &&values [[maybe_unused]]) noexcept final { - assert(false && "unimplemented"); - } }; } // namespace diff --git a/common/server/tl-stats-t.h b/common/server/tl-stats-t.h index e548e28c86..b27ed535ed 100644 --- a/common/server/tl-stats-t.h +++ b/common/server/tl-stats-t.h @@ -17,10 +17,6 @@ class tl_stats_t : public stats_t { va_end(ap); } - bool need_aggregated_stats() noexcept final { - return true; - } - protected: void add_stat(char type [[maybe_unused]], const char *key, double value) noexcept final { sb_printf(&sb, "%s\t", key); @@ -45,8 +41,4 @@ class tl_stats_t : public stats_t { sb_printf(&sb, "%lld", value); sb_printf(&sb, "\n"); } - - void add_multiple_stats(const char *key [[maybe_unused]], std::vector &&values [[maybe_unused]]) noexcept final { - assert(false && "unimplemented"); - } }; diff --git a/common/stats/provider.h b/common/stats/provider.h index 4f3674aafb..d4b9e78ba5 100644 --- a/common/stats/provider.h +++ b/common/stats/provider.h @@ -67,23 +67,12 @@ class stats_t { add_gauge_stat(stat_key, value); } - void add_multiple_gauge_stats(std::vector &&values, const char *key1, const char *key2 = "") noexcept { - const size_t key1_len = std::strlen(key1); - const size_t key2_len = std::strlen(key2); - char stat_key[key1_len + key2_len + 1]; - std::memcpy(stat_key, key1, key1_len); - std::memcpy(stat_key + key1_len, key2, key2_len + 1); - - add_multiple_stats(stat_key, std::move(values)); - } - template void add_gauge_stat(const std::atomic &value, const char *key1, const char *key2 = "", const char *key3 = "") noexcept { add_gauge_stat(value.load(std::memory_order_relaxed), key1, key2, key3); } virtual void add_general_stat(const char *key, const char *value_format, ...) noexcept __attribute__((format(printf, 3, 4))) = 0; - virtual bool need_aggregated_stats() noexcept = 0; virtual ~stats_t() = default; @@ -94,8 +83,6 @@ class stats_t { virtual void add_stat_with_tag_type(char type, const char *key, const char *type_tag, double value) noexcept = 0; virtual void add_stat_with_tag_type(char type, const char *key, const char *type_tag, long long value) noexcept = 0; - virtual void add_multiple_stats(const char *key, std::vector &&values) noexcept = 0; - char *normalize_key(const char *key, const char *format, const char *prefix) noexcept; }; diff --git a/runtime/memory_resource/memory_resource.cpp b/runtime/memory_resource/memory_resource.cpp index fc7c038acc..a96314e746 100644 --- a/runtime/memory_resource/memory_resource.cpp +++ b/runtime/memory_resource/memory_resource.cpp @@ -10,10 +10,8 @@ void MemoryStats::write_stats_to(stats_t *stats, const char *prefix) const noexc stats->add_gauge_stat(memory_limit, prefix, ".memory.limit"); stats->add_gauge_stat(memory_used, prefix, ".memory.used"); stats->add_gauge_stat(real_memory_used, prefix, ".memory.real_used"); - if (stats->need_aggregated_stats()) { - stats->add_gauge_stat(max_memory_used, prefix, ".memory.used_max"); - stats->add_gauge_stat(max_real_memory_used, prefix, ".memory.real_used_max"); - } + stats->add_gauge_stat(max_memory_used, prefix, ".memory.used_max"); + stats->add_gauge_stat(max_real_memory_used, prefix, ".memory.real_used_max"); stats->add_gauge_stat(defragmentation_calls, prefix, ".memory.defragmentation_calls"); stats->add_gauge_stat(huge_memory_pieces, prefix, ".memory.huge_memory_pieces"); stats->add_gauge_stat(small_memory_pieces, prefix, ".memory.small_memory_pieces"); diff --git a/server/confdata-stats.cpp b/server/confdata-stats.cpp index 94019fa008..566a1b9624 100644 --- a/server/confdata-stats.cpp +++ b/server/confdata-stats.cpp @@ -83,9 +83,8 @@ void ConfdataStats::write_stats_to(stats_t *stats, const memory_resource::Memory stats->add_gauge_stat("confdata.updates.ignored", ignored_updates); stats->add_gauge_stat("confdata.updates.total", total_updates); - if (stats->need_aggregated_stats()) { - stats->add_gauge_stat("confdata.elements.total", total_elements); - } + stats->add_gauge_stat("confdata.elements.total", total_elements); + stats->add_gauge_stat_with_type_tag("confdata.elements", "simple_key", simple_key_elements); stats->add_gauge_stat_with_type_tag("confdata.elements", "one_dot_wildcard", one_dot_wildcard_elements); stats->add_gauge_stat_with_type_tag("confdata.elements", "two_dots_wildcard", two_dots_wildcard_elements); diff --git a/server/php-engine.cpp b/server/php-engine.cpp index 4f1649885b..a70c8e0fbc 100644 --- a/server/php-engine.cpp +++ b/server/php-engine.cpp @@ -90,7 +90,6 @@ #include "server/shared-data-worker-cache.h" #include "server/signal-handlers.h" #include "server/statshouse/statshouse-client.h" -#include "server/statshouse/worker-stats-buffer.h" #include "server/workers-control.h" using job_workers::JobWorkersContext; @@ -1423,7 +1422,6 @@ void cron() { } vk::singleton::get().on_worker_cron(); vk::singleton::get().update_this_worker_stats(); - vk::singleton::get().flush_if_needed(); } void reopen_json_log() { @@ -1677,7 +1675,6 @@ void init_all() { init_php_scripts(); vk::singleton::get().set_idle_worker_status(); - vk::singleton::get(); worker_id = (int)lrand48(); @@ -2086,11 +2083,13 @@ int main_args_handler(int i, const char *long_option) { kprintf("--%s option: can't find ':'\n", long_option); return -1; } + auto host = std::string(optarg, colon - optarg); + auto port = atoi(colon + 1); + if (host.empty()) { + host = "127.0.0.1"; + } - auto &statshouse_client = vk::singleton::get(); - statshouse_client.set_host(std::string(optarg, colon - optarg)); - statshouse_client.set_port(atoi(colon + 1)); - vk::singleton::get().enable(); + StatsHouseClient::init(host, port); return 0; } case 2027: { diff --git a/server/php-master.cpp b/server/php-master.cpp index 76a88dc4a6..3397443d44 100644 --- a/server/php-master.cpp +++ b/server/php-master.cpp @@ -64,7 +64,6 @@ #include "server/server-stats.h" #include "server/shared-data-worker-cache.h" #include "server/shared-data.h" -#include "server/statshouse/add-metrics-batch.h" #include "server/statshouse/statshouse-client.h" #include "server/workers-control.h" @@ -78,6 +77,7 @@ #include "server/job-workers/shared-memory-manager.h" #include "server/json-logger.h" #include "server/server-config.h" +#include "server/workers-stats.h" using job_workers::JobWorkersContext; @@ -101,13 +101,8 @@ static sigset_t empty_mask; static double my_now; /*** Stats ***/ -static long tot_workers_started{0}; -static long tot_workers_dead{0}; -static long tot_workers_strange_dead{0}; -static long workers_killed{0}; -static long workers_hung{0}; -static long workers_terminated{0}; -static long workers_failed{0}; + +static workers_stats_t workers_stats{}; struct CpuStatTimestamp { double timestamp; @@ -461,7 +456,7 @@ void terminate_worker(worker_info_t *w) { if (w->type == WorkerType::general_worker) { changed = 1; } - workers_terminated++; + workers_stats.workers_terminated++; } int kill_worker(WorkerType worker_type) { @@ -492,7 +487,7 @@ void kill_hanging_workers() { if (!worker->is_dying && worker->last_activity_time + get_max_hanging_time_sec() <= my_now) { tvkprintf(master_process, 1, "No stats received from %s [pid = %d]. Terminate it\n", worker->type == WorkerType::general_worker ? "general worker" : "job worker", static_cast(worker->pid)); - workers_hung++; + workers_stats.workers_hung++; terminate_worker(worker); last_terminated = my_now; break; @@ -505,7 +500,7 @@ void kill_hanging_workers() { kprintf("master kill hanging %s : send SIGKILL to [pid = %d]\n", workers[i]->type == WorkerType::general_worker ? "general worker" : "job worker", static_cast(workers[i]->pid)); kill(workers[i]->pid, SIGKILL); - workers_killed++; + workers_stats.workers_killed++; workers[i]->kill_flag = 1; @@ -606,7 +601,7 @@ int run_worker(WorkerType worker_type) { assert (vk::singleton::get().get_all_alive() < WorkersControl::max_workers_count); - tot_workers_started++; + workers_stats.tot_workers_started++; const uint16_t worker_unique_id = vk::singleton::get().on_worker_creating(worker_type); pid_t new_pid = fork(); if (new_pid == -1) { @@ -718,7 +713,7 @@ void remove_worker(pid_t pid) { if (workers[i]->type == WorkerType::general_worker && !workers[i]->is_dying) { failed++; } - workers_failed++; + workers_stats.workers_failed++; delete_worker(workers[i]); @@ -739,9 +734,9 @@ void update_workers() { pid_t pid = waitpid(-1, &status, WNOHANG); if (pid > 0) { if (!WIFEXITED (status)) { - tot_workers_strange_dead++; + workers_stats.tot_workers_strange_dead++; } - tot_workers_dead++; + workers_stats.tot_workers_dead++; remove_worker(pid); changed = 1; } else { @@ -972,13 +967,13 @@ std::string php_master_prepare_stats(bool add_worker_pids) { << "total_workers\t" << general_workers_stat.total_workers + job_workers_stat.total_workers << "\n" << "running_workers\t" << general_workers_stat.running_workers + job_workers_stat.running_workers << "\n" << "paused_workers\t" << general_workers_stat.waiting_workers + job_workers_stat.waiting_workers << "\n" - << "tot_workers_started\t" << tot_workers_started << "\n" - << "tot_workers_dead\t" << tot_workers_dead << "\n" - << "tot_workers_strange_dead\t" << tot_workers_strange_dead << "\n" - << "workers_killed\t" << workers_killed << "\n" - << "workers_hung\t" << workers_hung << "\n" - << "workers_terminated\t" << workers_terminated << "\n" - << "workers_failed\t" << workers_failed << "\n"; + << "tot_workers_started\t" << workers_stats.tot_workers_started << "\n" + << "tot_workers_dead\t" << workers_stats.tot_workers_dead << "\n" + << "tot_workers_strange_dead\t" << workers_stats.tot_workers_strange_dead << "\n" + << "workers_killed\t" << workers_stats.workers_killed << "\n" + << "workers_hung\t" << workers_stats.workers_hung << "\n" + << "workers_terminated\t" << workers_stats.workers_terminated << "\n" + << "workers_failed\t" << workers_stats.workers_failed << "\n"; stats.write_stats_to(oss, add_worker_pids); std::for_each(workers, last_worker, [&oss](const worker_info_t *w) { @@ -1121,23 +1116,21 @@ STATS_PROVIDER_TAGGED(kphp_stats, 100, stats_tag_kphp_server) { stats->add_gauge_stat("workers.job.processes.working", job_worker_group.running_workers); stats->add_gauge_stat("workers.job.processes.working_but_waiting", job_worker_group.waiting_workers); - if (stats->need_aggregated_stats()) { - auto running_stats = server_stats.misc_stat_for_general_workers[1].get_stat(); - stats->add_gauge_stat("workers.general.processes.running.avg_1m", running_stats.running_workers_avg); - stats->add_gauge_stat("workers.general.processes.running.max_1m", running_stats.running_workers_max); + auto running_stats = server_stats.misc_stat_for_general_workers[1].get_stat(); + stats->add_gauge_stat("workers.general.processes.running.avg_1m", running_stats.running_workers_avg); + stats->add_gauge_stat("workers.general.processes.running.max_1m", running_stats.running_workers_max); - running_stats = server_stats.misc_stat_for_job_workers[1].get_stat(); - stats->add_gauge_stat("workers.job.processes.running.avg_1m", running_stats.running_workers_avg); - stats->add_gauge_stat("workers.job.processes.running.max_1m", running_stats.running_workers_max); - } + running_stats = server_stats.misc_stat_for_job_workers[1].get_stat(); + stats->add_gauge_stat("workers.job.processes.running.avg_1m", running_stats.running_workers_avg); + stats->add_gauge_stat("workers.job.processes.running.max_1m", running_stats.running_workers_max); - stats->add_gauge_stat("server.workers.started", tot_workers_started); - stats->add_gauge_stat("server.workers.dead", tot_workers_dead); - stats->add_gauge_stat("server.workers.strange_dead", tot_workers_strange_dead); - stats->add_gauge_stat("server.workers.killed", workers_killed); - stats->add_gauge_stat("server.workers.hung", workers_hung); - stats->add_gauge_stat("server.workers.terminated", workers_terminated); - stats->add_gauge_stat("server.workers.failed", workers_failed); + stats->add_gauge_stat("server.workers.started", workers_stats.tot_workers_started); + stats->add_gauge_stat("server.workers.dead", workers_stats.tot_workers_dead); + stats->add_gauge_stat("server.workers.strange_dead", workers_stats.tot_workers_strange_dead); + stats->add_gauge_stat("server.workers.killed", workers_stats.workers_killed); + stats->add_gauge_stat("server.workers.hung", workers_stats.workers_hung); + stats->add_gauge_stat("server.workers.terminated", workers_stats.workers_terminated); + stats->add_gauge_stat("server.workers.failed", workers_stats.workers_failed); const auto cpu_stats = server_stats.cpu[1].get_stat(); stats->add_gauge_stat("cpu.stime", cpu_stats.cpu_s_usage); @@ -1405,7 +1398,11 @@ static void cron() { if (!other->is_alive || in_old_master_on_restart()) { // write stats at the beginning to avoid spikes in graphs send_data_to_statsd_with_prefix(vk::singleton::get().get_statsd_prefix(), stats_tag_kphp_server); - vk::singleton::get().master_send_metrics(); + if (StatsHouseClient::has()) { + const auto cpu_stats = server_stats.cpu[1].get_stat(); + StatsHouseClient::get().send_common_master_stats(workers_stats, instance_cache_get_memory_stats(), cpu_stats.cpu_s_usage, cpu_stats.cpu_u_usage, + instance_cache_memory_swaps_ok, instance_cache_memory_swaps_fail); + } } create_all_outbound_connections(); vk::singleton::get().aggregate_stats(); diff --git a/server/server-stats.cpp b/server/server-stats.cpp index ad10feb1d7..c3e8544a28 100644 --- a/server/server-stats.cpp +++ b/server/server-stats.cpp @@ -20,7 +20,6 @@ #include "server/json-logger.h" #include "server/server-stats.h" #include "server/statshouse/statshouse-client.h" -#include "server/statshouse/worker-stats-buffer.h" namespace { @@ -604,7 +603,8 @@ void ServerStats::after_fork(pid_t worker_pid, uint64_t active_connections, uint worker_type_ = worker_type; gen_->seed(worker_pid); shared_stats_->workers.reset_worker_stats(worker_pid, active_connections, max_connections, worker_process_id_); - last_update_ = std::chrono::steady_clock::now(); + last_update_aggr_stats = std::chrono::steady_clock::now(); + last_update_statshouse = std::chrono::steady_clock::now(); } void ServerStats::add_request_stats(double script_time_sec, double net_time_sec, int64_t script_queries, int64_t long_script_queries, int64_t memory_used, @@ -618,13 +618,10 @@ void ServerStats::add_request_stats(double script_time_sec, double net_time_sec, shared_stats_->workers.add_worker_stats(queries_stat, worker_process_id_); using namespace statshouse; - vk::singleton::get().add_query_stat(GenericQueryStatKey::memory_used, worker_type_, memory_used); - vk::singleton::get().add_query_stat(GenericQueryStatKey::real_memory_used, worker_type_, real_memory_used); - - vk::singleton::get().add_query_stat(GenericQueryStatKey::script_time, worker_type_, script_time.count()); - vk::singleton::get().add_query_stat(GenericQueryStatKey::net_time, worker_type_, net_time.count()); - vk::singleton::get().add_query_stat(GenericQueryStatKey::outgoing_queries, worker_type_, script_queries); - vk::singleton::get().add_query_stat(GenericQueryStatKey::outgoing_long_queries, worker_type_, long_script_queries); + if (StatsHouseClient::has()) { + StatsHouseClient::get().send_request_stats(worker_type_, script_time.count(), net_time.count(), memory_used, real_memory_used, script_queries, + long_script_queries); + } } void ServerStats::add_job_stats(double job_wait_time_sec, int64_t request_memory_used, int64_t request_real_memory_used, int64_t response_memory_used, @@ -632,27 +629,31 @@ void ServerStats::add_job_stats(double job_wait_time_sec, int64_t request_memory const auto job_wait_time = std::chrono::duration_cast(std::chrono::duration(job_wait_time_sec)); shared_stats_->job_workers.add_job_stats(job_wait_time.count(), request_memory_used, request_real_memory_used, response_memory_used, response_real_memory_used); - using namespace statshouse; - vk::singleton::get().add_query_stat(QueryStatKey::job_wait_time, job_wait_time.count()); - vk::singleton::get().add_query_stat(QueryStatKey::job_request_memory_usage, request_memory_used); - vk::singleton::get().add_query_stat(QueryStatKey::job_request_real_memory_usage, request_real_memory_used); - vk::singleton::get().add_query_stat(QueryStatKey::job_response_memory_usage, response_memory_used); - vk::singleton::get().add_query_stat(QueryStatKey::job_response_real_memory_usage, response_real_memory_used); + if (StatsHouseClient::has()) { + StatsHouseClient::get().send_job_stats(job_wait_time.count(), request_memory_used, request_real_memory_used, response_memory_used, + response_real_memory_used); + } } void ServerStats::add_job_common_memory_stats(int64_t common_request_memory_used, int64_t common_request_real_memory_used) noexcept { shared_stats_->job_workers.add_job_common_memory_stats(common_request_memory_used, common_request_real_memory_used); - using namespace statshouse; - vk::singleton::get().add_query_stat(QueryStatKey::job_common_request_memory_usage, common_request_memory_used); - vk::singleton::get().add_query_stat(QueryStatKey::job_common_request_real_memory_usage, common_request_real_memory_used); + if (StatsHouseClient::has()) { + StatsHouseClient::get().send_job_common_memory_stats(common_request_memory_used, common_request_real_memory_used); + } } void ServerStats::update_this_worker_stats() noexcept { const auto now_tp = std::chrono::steady_clock::now(); - if (now_tp - last_update_ >= std::chrono::seconds{5}) { + if (now_tp - last_update_aggr_stats >= std::chrono::seconds{5}) { shared_stats_->workers.update_worker_stats(worker_process_id_); - last_update_ = now_tp; + last_update_aggr_stats = now_tp; + } + + if (StatsHouseClient::has() && (now_tp - last_update_statshouse >= std::chrono::seconds{1})) { + auto virtual_memory_stat = get_self_mem_stats(); + StatsHouseClient::get().send_worker_memory_stats(worker_type_, virtual_memory_stat); + last_update_statshouse = now_tp; } } @@ -674,11 +675,11 @@ void ServerStats::set_running_worker_status() noexcept { void ServerStats::aggregate_stats() noexcept { const auto now_tp = std::chrono::steady_clock::now(); - if (now_tp - last_update_ < std::chrono::seconds{5}) { + if (now_tp - last_update_aggr_stats < std::chrono::seconds{5}) { return; } - last_update_ = now_tp; + last_update_aggr_stats = now_tp; const auto &workers_control = vk::singleton::get(); const uint16_t general_workers = workers_control.get_count(WorkerType::general_worker); @@ -709,30 +710,22 @@ uint64_t kb2bytes(uint64_t kb) noexcept { template void write_to(stats_t *stats, const char *prefix, const char *suffix, const AggregatedSamples &samples, const Mapper &mapper = {}) { - if (stats->need_aggregated_stats()) { - stats->add_gauge_stat(mapper(samples.percentiles.p50), prefix, suffix, ".p50"); - stats->add_gauge_stat(mapper(samples.percentiles.p75), prefix, suffix, ".p75"); - stats->add_gauge_stat(mapper(samples.percentiles.p90), prefix, suffix, ".p90"); - stats->add_gauge_stat(mapper(samples.percentiles.p95), prefix, suffix, ".p95"); - stats->add_gauge_stat(mapper(samples.percentiles.p99), prefix, suffix, ".p99"); - stats->add_gauge_stat(mapper(samples.percentiles.max), prefix, suffix, ".max"); - } + stats->add_gauge_stat(mapper(samples.percentiles.p50), prefix, suffix, ".p50"); + stats->add_gauge_stat(mapper(samples.percentiles.p75), prefix, suffix, ".p75"); + stats->add_gauge_stat(mapper(samples.percentiles.p90), prefix, suffix, ".p90"); + stats->add_gauge_stat(mapper(samples.percentiles.p95), prefix, suffix, ".p95"); + stats->add_gauge_stat(mapper(samples.percentiles.p99), prefix, suffix, ".p99"); + stats->add_gauge_stat(mapper(samples.percentiles.max), prefix, suffix, ".max"); } template void write_to(stats_t *stats, const char *prefix, const char *suffix, const WorkerSamples &samples, const Mapper &mapper = {}) { - if (stats->need_aggregated_stats()) { - stats->add_gauge_stat(mapper(samples.percentiles.p50), prefix, suffix, ".p50"); - stats->add_gauge_stat(mapper(samples.percentiles.p75), prefix, suffix, ".p75"); - stats->add_gauge_stat(mapper(samples.percentiles.p90), prefix, suffix, ".p90"); - stats->add_gauge_stat(mapper(samples.percentiles.p95), prefix, suffix, ".p95"); - stats->add_gauge_stat(mapper(samples.percentiles.p99), prefix, suffix, ".p99"); - stats->add_gauge_stat(mapper(samples.percentiles.max), prefix, suffix, ".max"); - } else { - const uint16_t workers_count = vk::singleton::get().get_total_workers_count(); - std::vector values(samples.samples.begin(), samples.samples.begin() + workers_count); - stats->add_multiple_gauge_stats(std::move(values), prefix, suffix); - } + stats->add_gauge_stat(mapper(samples.percentiles.p50), prefix, suffix, ".p50"); + stats->add_gauge_stat(mapper(samples.percentiles.p75), prefix, suffix, ".p75"); + stats->add_gauge_stat(mapper(samples.percentiles.p90), prefix, suffix, ".p90"); + stats->add_gauge_stat(mapper(samples.percentiles.p95), prefix, suffix, ".p95"); + stats->add_gauge_stat(mapper(samples.percentiles.p99), prefix, suffix, ".p99"); + stats->add_gauge_stat(mapper(samples.percentiles.max), prefix, suffix, ".max"); } void write_to(stats_t *stats, const char *prefix, const WorkerAggregatedStats &agg, const WorkerSharedStats &shared) noexcept { diff --git a/server/server-stats.h b/server/server-stats.h index c38c2dbc2d..9c5198c367 100644 --- a/server/server-stats.h +++ b/server/server-stats.h @@ -59,7 +59,8 @@ class ServerStats : vk::not_copyable { WorkerType worker_type_{WorkerType::general_worker}; uint16_t worker_process_id_{0}; - std::chrono::steady_clock::time_point last_update_; + std::chrono::steady_clock::time_point last_update_aggr_stats; + std::chrono::steady_clock::time_point last_update_statshouse; std::mt19937 *gen_{nullptr}; diff --git a/server/server.cmake b/server/server.cmake index d7290dced6..729c6f3fcb 100644 --- a/server/server.cmake +++ b/server/server.cmake @@ -30,9 +30,7 @@ prepend(KPHP_SERVER_SOURCES ${BASE_DIR}/server/ workers-control.cpp shared-data-worker-cache.cpp signal-handlers.cpp - statshouse/statshouse-client.cpp - statshouse/add-metrics-batch.cpp - statshouse/worker-stats-buffer.cpp) + statshouse/statshouse-client.cpp) prepend(KPHP_JOB_WORKERS_SOURCES ${BASE_DIR}/server/job-workers/ job-stats.cpp diff --git a/server/statshouse/add-metrics-batch.cpp b/server/statshouse/add-metrics-batch.cpp deleted file mode 100644 index 6207b36cfa..0000000000 --- a/server/statshouse/add-metrics-batch.cpp +++ /dev/null @@ -1,52 +0,0 @@ -// Compiler for PHP (aka KPHP) -// Copyright (c) 2021 LLC «V Kontakte» -// Distributed under the GPL v3 License, see LICENSE.notice.txt - -#include "server/statshouse/add-metrics-batch.h" - -#include - -#include "common/tl/constants/statshouse.h" -#include "common/tl/parse.h" -#include "common/tl/store.h" - -void StatsHouseMetric::tl_store() const { - tl_store_int(fields_mask); - vk::tl::store_string(name); - tl_store_int(tags.size()); - for (const auto &tag : tags) { - vk::tl::store_string(tag.first); - vk::tl::store_string(tag.second); - } - if (fields_mask & vk::tl::statshouse::metric_fields_mask::counter) { - tl_store_double(counter); - } - if (fields_mask & vk::tl::statshouse::metric_fields_mask::t) { - tl_store_long(t); - } - if (fields_mask & vk::tl::statshouse::metric_fields_mask::value) { - vk::tl::store_vector(value); - } - if (fields_mask & vk::tl::statshouse::metric_fields_mask::unique) { - vk::tl::store_vector(unique); - } - if (fields_mask & vk::tl::statshouse::metric_fields_mask::stop) { - vk::tl::store_vector(stop); - } -} - -void StatsHouseAddMetricsBatch::tl_store() const { - tl_store_int(TL_STATSHOUSE_ADD_METRICS_BATCH); - tl_store_int(fields_mask); - tl_store_int(metrics_size); -} - -StatsHouseMetric make_statshouse_value_metric(std::string &&name, double value, const std::vector> &tags) { - constexpr int fields_mask = vk::tl::statshouse::metric_fields_mask::value; - return {.fields_mask = fields_mask, .name = std::move(name), .tags = tags, .counter = 0, .t = 0, .value = {value}}; -} - -StatsHouseMetric make_statshouse_value_metrics(std::string &&name, std::vector &&value, const std::vector> &tags) { - constexpr int fields_mask = vk::tl::statshouse::metric_fields_mask::value; - return {.fields_mask = fields_mask, .name = std::move(name), .tags = tags, .counter = 0, .t = 0, .value = std::move(value)}; -} diff --git a/server/statshouse/add-metrics-batch.h b/server/statshouse/add-metrics-batch.h deleted file mode 100644 index 465cf7166f..0000000000 --- a/server/statshouse/add-metrics-batch.h +++ /dev/null @@ -1,32 +0,0 @@ -// Compiler for PHP (aka KPHP) -// Copyright (c) 2021 LLC «V Kontakte» -// Distributed under the GPL v3 License, see LICENSE.notice.txt - -#pragma once - -#include -#include - -struct StatsHouseMetric { - int fields_mask{0}; - std::string name; - std::vector> tags; - double counter{0.0}; // fields_mask bit #0 - long long t{}; // fields_mask bit #5 - std::vector value; // fields_mask bit #1 - std::vector unique; // fields_mask bit #2 - std::vector stop; // fields_mask bit #3 - - void tl_store() const; -}; - -struct StatsHouseAddMetricsBatch { - int fields_mask{0}; - int metrics_size{0}; - - void tl_store() const; -}; - -StatsHouseMetric make_statshouse_value_metric(std::string &&name, double value, const std::vector> &tags); - -StatsHouseMetric make_statshouse_value_metrics(std::string &&name, std::vector &&value, const std::vector> &tags); diff --git a/server/statshouse/statshouse-client.cpp b/server/statshouse/statshouse-client.cpp index 2f52223e4d..04291aad78 100644 --- a/server/statshouse/statshouse-client.cpp +++ b/server/statshouse/statshouse-client.cpp @@ -1,164 +1,209 @@ // Compiler for PHP (aka KPHP) -// Copyright (c) 2021 LLC «V Kontakte» +// Copyright (c) 2023 LLC «V Kontakte» // Distributed under the GPL v3 License, see LICENSE.notice.txt #include "server/statshouse/statshouse-client.h" -#include -#include - -#include "common/resolver.h" -#include "common/server/stats.h" -#include "common/stats/provider.h" -#include "common/tl/constants/statshouse.h" -#include "common/tl/methods/string.h" -#include "net/net-connections.h" -#include "runtime/critical_section.h" +#include "common/precise-time.h" +#include "runtime/instance-cache.h" +#include "server/job-workers/shared-memory-manager.h" +#include "server/json-logger.h" #include "server/server-config.h" -#include "server/server-log.h" +#include "server/server-stats.h" -namespace { +StatsHouseClient *StatsHouseClient::inner = nullptr; -constexpr int STATSHOUSE_HEADER_OFFSET = 3 * sizeof(int32_t); // for magic, fields_mask and vector size -constexpr int STATSHOUSE_UDP_BUFFER_THRESHOLD = static_cast(65507 * 0.8); // 80% of max UDP packet size +template +T unpack(const std::atomic &value) { + return value.load(std::memory_order_relaxed); +} -class statshouse_stats_t : public stats_t { -public: - explicit statshouse_stats_t(const std::vector> &tags) - : tags(tags) {} +inline size_t get_memory_used(size_t acquired, size_t released, size_t buffer_size) { + return acquired > released ? (acquired - released) * buffer_size : 0; +} - void add_general_stat(const char *, const char *, ...) noexcept final { - // ignore it - } +StatsHouseClient::StatsHouseClient(const std::string &ip, int port) + : transport(ip, port){}; - bool need_aggregated_stats() noexcept final { - return false; - } +void StatsHouseClient::send_request_stats(WorkerType raw_worker_type, uint64_t script_time_ns, uint64_t net_time_ns, uint64_t memory_used, + uint64_t real_memory_used, uint64_t script_queries, uint64_t long_script_queries) { + const char *cluster_name = vk::singleton::get().get_cluster_name(); + const char *worker_type = raw_worker_type == WorkerType::general_worker ? "general" : "job"; + transport.metric("kphp_request_time").tag(cluster_name).tag("script").tag(worker_type).write_value(script_time_ns); + transport.metric("kphp_request_time").tag(cluster_name).tag("net").tag(worker_type).write_value(net_time_ns); - void flush() { - auto metrics_batch = StatsHouseAddMetricsBatch{.fields_mask = vk::tl::statshouse::add_metrics_batch_fields_mask::ALL, .metrics_size = counter}; - vk::tl::store_to_buffer(sb.buff, STATSHOUSE_HEADER_OFFSET, metrics_batch); - vk::singleton::get().send_metrics(sb.buff, sb.pos); + transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("used").tag(worker_type).write_value(memory_used); + transport.metric("kphp_memory_script_usage").tag(cluster_name).tag("real_used").tag(worker_type).write_value(real_memory_used); - sb.pos = STATSHOUSE_HEADER_OFFSET; - counter = 0; - } + transport.metric("kphp_requests_outgoing_queries").tag(cluster_name).tag(worker_type).write_value(script_queries); + transport.metric("kphp_requests_outgoing_long_queries").tag(cluster_name).tag(worker_type).write_value(long_script_queries); +} - void flush_if_needed() { - if (sb.pos < STATSHOUSE_UDP_BUFFER_THRESHOLD) { - return; - } - flush(); - } +void StatsHouseClient::send_job_stats(uint64_t job_wait_ns, uint64_t request_memory_used, uint64_t request_real_memory_used, uint64_t response_memory_used, + uint64_t response_real_memory_used) { + const char *cluster_name = vk::singleton::get().get_cluster_name(); + transport.metric("kphp_job_queue_time").tag(cluster_name).write_value(job_wait_ns); -protected: - void add_stat(char type [[maybe_unused]], const char *key, double value) noexcept final { - auto metric = make_statshouse_value_metric(normalize_key(key, "_%s", stats_prefix), value, tags); - auto len = vk::tl::store_to_buffer(sb.buff + sb.pos, sb.size, metric); - sb.pos += len; - ++counter; - flush_if_needed(); - } + transport.metric("kphp_job_request_memory_usage").tag(cluster_name).tag("used").write_value(request_memory_used); + transport.metric("kphp_job_request_memory_usage").tag(cluster_name).tag("real_used").write_value(request_real_memory_used); - void add_stat(char type, const char *key, long long value) noexcept final { - add_stat(type, key, static_cast(value)); - } + transport.metric("kphp_job_response_memory_usage").tag(cluster_name).tag("used").write_value(response_memory_used); + transport.metric("kphp_job_response_memory_usage").tag(cluster_name).tag("real_used").write_value(response_real_memory_used); +} - void add_stat_with_tag_type(char type [[maybe_unused]], const char *key, const char *type_tag, double value) noexcept final { - std::vector> metric_tags = {{"type", std::string(type_tag)}, {"host", std::string(kdb_gethostname())}}; - auto metric = make_statshouse_value_metric(normalize_key(key, "_%s", stats_prefix), value, metric_tags); - auto len = vk::tl::store_to_buffer(sb.buff + sb.pos, sb.size, metric); - sb.pos += len; - ++counter; - flush_if_needed(); - } +void StatsHouseClient::send_job_common_memory_stats(uint64_t job_common_request_memory_used, uint64_t job_common_request_real_memory_used) { + const char *cluster_name = vk::singleton::get().get_cluster_name(); + transport.metric("kphp_job_common_request_memory").tag(cluster_name).tag("used").write_value(job_common_request_memory_used); + transport.metric("kphp_job_common_request_memory").tag(cluster_name).tag("real_used").write_value(job_common_request_real_memory_used); +} - void add_stat_with_tag_type(char type, const char *key, const char *type_tag, long long value) noexcept final { - add_stat_with_tag_type(type, key, type_tag, static_cast(value)); - } +void StatsHouseClient::send_worker_memory_stats(WorkerType raw_worker_type, const mem_info_t &mem_stats) { + const char *cluster_name = vk::singleton::get().get_cluster_name(); + const char *worker_type = raw_worker_type == WorkerType::general_worker ? "general" : "job"; + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("vm_peak").write_value(mem_stats.vm_peak); + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("vm").write_value(mem_stats.vm); + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("rss").write_value(mem_stats.rss); + transport.metric("kphp_workers_memory").tag(cluster_name).tag(worker_type).tag("rss_peak").write_value(mem_stats.rss_peak); +} - void add_multiple_stats(const char *key, std::vector &&values) noexcept final { - auto metric = make_statshouse_value_metrics(normalize_key(key, "_%s", stats_prefix), std::move(values), tags); - auto len = vk::tl::store_to_buffer(sb.buff + sb.pos, sb.size - sb.pos, metric); - sb.pos += len; - ++counter; - flush_if_needed(); +void StatsHouseClient::send_common_master_stats(const workers_stats_t &workers_stats, const memory_resource::MemoryStats &memory_stats, double cpu_s_usage, + double cpu_u_usage, long long int instance_cache_memory_swaps_ok, + long long int instance_cache_memory_swaps_fail) { + const char *cluster_name = vk::singleton::get().get_cluster_name(); + if (engine_tag) { + transport.metric("kphp_version").tag(cluster_name).write_value(atoll(engine_tag)); + } + + transport.metric("kphp_uptime").tag(cluster_name).write_value(get_uptime()); + + const auto general_worker_group = vk::singleton::get().collect_workers_stat(WorkerType::general_worker); + transport.metric("kphp_workers_general_processes").tag(cluster_name).tag("working").write_value(general_worker_group.running_workers); + transport.metric("kphp_workers_general_processes").tag(cluster_name).tag("working_but_waiting").write_value(general_worker_group.waiting_workers); + transport.metric("kphp_workers_general_processes").tag(cluster_name).tag("ready_for_accept").write_value(general_worker_group.ready_for_accept_workers); + + const auto job_worker_group = vk::singleton::get().collect_workers_stat(WorkerType::job_worker); + transport.metric("kphp_workers_job_processes").tag(cluster_name).tag("working").write_value(job_worker_group.running_workers); + transport.metric("kphp_workers_job_processes").tag(cluster_name).tag("working_but_waiting").write_value(job_worker_group.waiting_workers); + + transport.metric("kphp_server_workers").tag(cluster_name).tag("started").write_value(workers_stats.tot_workers_started); + transport.metric("kphp_server_workers").tag(cluster_name).tag("dead").write_value(workers_stats.tot_workers_dead); + transport.metric("kphp_server_workers").tag(cluster_name).tag("strange_dead").write_value(workers_stats.tot_workers_strange_dead); + transport.metric("kphp_server_workers").tag(cluster_name).tag("killed").write_value(workers_stats.workers_killed); + transport.metric("kphp_server_workers").tag(cluster_name).tag("hung").write_value(workers_stats.workers_hung); + transport.metric("kphp_server_workers").tag(cluster_name).tag("terminated").write_value(workers_stats.workers_terminated); + transport.metric("kphp_server_workers").tag(cluster_name).tag("failed").write_value(workers_stats.workers_failed); + + transport.metric("kphp_cpu_usage").tag(cluster_name).tag("stime").write_value(cpu_s_usage); + transport.metric("kphp_cpu_usage").tag(cluster_name).tag("utime").write_value(cpu_u_usage); + + auto total_workers_json_count = vk::singleton::get().collect_json_count_stat(); + uint64_t master_json_logs_count = vk::singleton::get().get_json_logs_count(); + transport.metric("kphp_server_total_json_logs_count").tag(cluster_name).write_value(std::get<0>(total_workers_json_count) + master_json_logs_count); + transport.metric("kphp_server_total_json_traces_count").tag(cluster_name).write_value(std::get<1>(total_workers_json_count)); + + transport.metric("kphp_instance_cache_memory").tag(cluster_name).tag("limit").write_value(memory_stats.memory_limit); + transport.metric("kphp_instance_cache_memory").tag(cluster_name).tag("used").write_value(memory_stats.memory_used); + transport.metric("kphp_instance_cache_memory").tag(cluster_name).tag("real_used").write_value(memory_stats.real_memory_used); + + transport.metric("kphp_instance_cache_memory_defragmentation_calls").tag(cluster_name).write_value(memory_stats.defragmentation_calls); + + transport.metric("kphp_instance_cache_memory_pieces").tag(cluster_name).tag("huge").write_value(memory_stats.huge_memory_pieces); + transport.metric("kphp_instance_cache_memory_pieces").tag(cluster_name).tag("small").write_value(memory_stats.small_memory_pieces); + + transport.metric("kphp_instance_cache_memory_buffer_swaps").tag(cluster_name).tag("ok").write_value(instance_cache_memory_swaps_ok); + transport.metric("kphp_instance_cache_memory_buffer_swaps").tag(cluster_name).tag("fail").write_value(instance_cache_memory_swaps_fail); + + const auto &instance_cache_element_stats = instance_cache_get_stats(); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("stored").write_value(unpack(instance_cache_element_stats.elements_stored)); + transport.metric("kphp_instance_cache_elements") + .tag(cluster_name) + .tag("stored_with_delay") + .write_value(unpack(instance_cache_element_stats.elements_stored_with_delay)); + transport.metric("kphp_instance_cache_elements") + .tag(cluster_name) + .tag("storing_skipped_due_recent_update") + .write_value(unpack(instance_cache_element_stats.elements_storing_skipped_due_recent_update)); + transport.metric("kphp_instance_cache_elements") + .tag(cluster_name) + .tag("storing_delayed_due_mutex") + .write_value(unpack(instance_cache_element_stats.elements_storing_delayed_due_mutex)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("fetched").write_value(unpack(instance_cache_element_stats.elements_fetched)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("missed").write_value(unpack(instance_cache_element_stats.elements_missed)); + transport.metric("kphp_instance_cache_elements") + .tag(cluster_name) + .tag("missed_earlier") + .write_value(unpack(instance_cache_element_stats.elements_missed_earlier)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("expired").write_value(unpack(instance_cache_element_stats.elements_expired)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("created").write_value(unpack(instance_cache_element_stats.elements_created)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("destroyed").write_value(unpack(instance_cache_element_stats.elements_destroyed)); + transport.metric("kphp_instance_cache_elements").tag(cluster_name).tag("cached").write_value(unpack(instance_cache_element_stats.elements_cached)); + transport.metric("kphp_instance_cache_elements") + .tag(cluster_name) + .tag("logically_expired_and_ignored") + .write_value(unpack(instance_cache_element_stats.elements_logically_expired_and_ignored)); + transport.metric("kphp_instance_cache_elements") + .tag(cluster_name) + .tag("logically_expired_but_fetched") + .write_value(unpack(instance_cache_element_stats.elements_logically_expired_but_fetched)); + + using namespace job_workers; + if (vk::singleton::get().is_initialized()) { + const JobStats &job_stats = vk::singleton::get().get_stats(); + transport.metric("kphp_workers_jobs_queue_size").tag(cluster_name).write_value(unpack(job_stats.job_queue_size)); + this->add_job_workers_shared_memory_stats(cluster_name, job_stats); } - -private: - int counter{0}; - const std::vector> &tags; -}; - -} // namespace - -void StatsHouseClient::set_port(int value) { - this->port = value; } -void StatsHouseClient::set_host(std::string value) { - this->host = std::move(value); -} +void StatsHouseClient::add_job_workers_shared_memory_stats(const char *cluster_name, const job_workers::JobStats &job_stats) { + using namespace job_workers; -bool StatsHouseClient::init_connection() { - if (sock_fd <= 0) { - sock_fd = socket(AF_INET, SOCK_DGRAM, 0); - if (sock_fd < 0) { - log_server_error("Can't create statshouse socket"); - return false; - } - } - fcntl(sock_fd, F_SETFL, O_NONBLOCK); + size_t total_used = this->add_job_workers_shared_messages_stats(cluster_name, job_stats.messages, JOB_SHARED_MESSAGE_BYTES); - hostent *h; - std::string hostname = host.empty() ? "localhost" : host; - if (!(h = gethostbyname(hostname.c_str())) || h->h_addrtype != AF_INET || h->h_length != 4 || !h->h_addr_list || !h->h_addr) { - log_server_error("Can't resolve statshouse host: %s", host.c_str()); - return false; + constexpr std::array extra_memory_prefixes{ + "256kb", "512kb", "1mb", "2mb", "4mb", "8mb", "16mb", "32mb", "64mb", + }; + for (size_t i = 0; i != JOB_EXTRA_MEMORY_BUFFER_BUCKETS; ++i) { + const size_t buffer_size = get_extra_shared_memory_buffer_size(i); + total_used += this->add_job_workers_shared_memory_buffers_stats(cluster_name, job_stats.extra_memory[i], extra_memory_prefixes[i], buffer_size); } - struct sockaddr_in addr {}; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = (*reinterpret_cast(h->h_addr)); - if (connect(sock_fd, reinterpret_cast(&addr), sizeof(addr)) < 0) { - log_server_error("Can't connect to statshouse host: %s", hostname.c_str()); - return false; - } - return true; -} -void StatsHouseClient::master_send_metrics() { - if (port == 0 || (sock_fd <= 0 && !init_connection())) { - return; - } - const char *cluster_name = vk::singleton::get().get_cluster_name(); - const std::vector> tags = {{"cluster", cluster_name}}; - statshouse_stats_t stats{tags}; - stats.stats_prefix = "kphp"; - char *buf = get_engine_default_prepare_stats_buffer(); - - sb_init(&stats.sb, buf, STATS_BUFFER_LEN); - stats.sb.pos = STATSHOUSE_HEADER_OFFSET; - prepare_common_stats_with_tag_mask(&stats, stats_tag_kphp_server); - stats.flush(); + transport.metric("kphp_job_workers_shared_memory").tag(cluster_name).tag("limit").write_value(job_stats.memory_limit); + transport.metric("kphp_job_workers_shared_memory").tag(cluster_name).tag("used").write_value(total_used); } -void StatsHouseClient::send_metrics(char *result, int len) { - if (port == 0 || (sock_fd <= 0 && !init_connection())) { - return; - } +size_t StatsHouseClient::add_job_workers_shared_messages_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, + size_t buffer_size) { + using namespace job_workers; - ssize_t slen = send(sock_fd, result, len, 0); - if (slen < 0) { - log_server_error("Can't send metrics to statshouse (len = %i): %s", len, strerror(errno)); - } -} + const size_t acquired_buffers = unpack(memory_buffers_stats.acquired); + const size_t released_buffers = unpack(memory_buffers_stats.released); + const size_t memory_used = get_memory_used(acquired_buffers, released_buffers, buffer_size); -StatsHouseClient::StatsHouseClient() {} + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("reserved").write_value(memory_buffers_stats.count); + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("acquire_fails").write_value(unpack(memory_buffers_stats.acquire_fails)); + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("acquired").write_value(acquired_buffers); + transport.metric("kphp_job_workers_shared_messages").tag(cluster_name).tag("released").write_value(released_buffers); -StatsHouseClient::~StatsHouseClient() { - if (sock_fd > 0) { - close(sock_fd); - } + return memory_used; +} + +size_t StatsHouseClient::add_job_workers_shared_memory_buffers_stats(const char *cluster_name, + const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, const char *size_tag, + size_t buffer_size) { + using namespace job_workers; + + const size_t acquired_buffers = unpack(memory_buffers_stats.acquired); + const size_t released_buffers = unpack(memory_buffers_stats.released); + const size_t memory_used = get_memory_used(acquired_buffers, released_buffers, buffer_size); + + transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("reserved").write_value(memory_buffers_stats.count); + transport.metric("kphp_job_workers_shared_extra_buffers") + .tag(cluster_name) + .tag(size_tag) + .tag("acquire_fails") + .write_value(unpack(memory_buffers_stats.acquire_fails)); + transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("acquired").write_value(acquired_buffers); + transport.metric("kphp_job_workers_shared_extra_buffers").tag(cluster_name).tag(size_tag).tag("released").write_value(released_buffers); + + return memory_used; } diff --git a/server/statshouse/statshouse-client.h b/server/statshouse/statshouse-client.h index bf9f5ff849..aa7e44b816 100644 --- a/server/statshouse/statshouse-client.h +++ b/server/statshouse/statshouse-client.h @@ -1,32 +1,63 @@ // Compiler for PHP (aka KPHP) -// Copyright (c) 2021 LLC «V Kontakte» +// Copyright (c) 2023 LLC «V Kontakte» // Distributed under the GPL v3 License, see LICENSE.notice.txt #pragma once -#include "server/statshouse/add-metrics-batch.h" +#include "third-party/statshouse.h" +#include + +#include "common/dl-utils-lite.h" #include "common/mixin/not_copyable.h" -#include "common/smart_ptrs/singleton.h" +#include "runtime/memory_resource/memory_resource.h" +#include "server/job-workers/job-stats.h" +#include "server/workers-control.h" +#include "server/workers-stats.h" class StatsHouseClient : vk::not_copyable { public: - void set_port(int value); - void set_host(std::string value); + static void init(std::string ip, int port) { + static StatsHouseClient client{ip, port}; + inner = &client; + } + + static bool has() { + return inner != nullptr; + } + + static StatsHouseClient &get() { + assert(inner); + return *inner; + } + + void send_request_stats(WorkerType raw_worker_type, uint64_t script_time_ns, uint64_t net_time_ns, uint64_t memory_used, uint64_t real_memory_used, + uint64_t script_queries, uint64_t long_script_queries); + + void send_job_stats(uint64_t job_wait_ns, uint64_t request_memory_used, uint64_t request_real_memory_used, uint64_t response_memory_used, + uint64_t response_real_memory_used); + + void send_job_common_memory_stats(uint64_t job_common_request_memory_used, uint64_t job_common_request_real_memory_used); + + void send_worker_memory_stats(WorkerType raw_worker_type, const mem_info_t &mem_stats); + /** * Must be called from master process only */ - void master_send_metrics(); - void send_metrics(char* result, int len); + void send_common_master_stats(const workers_stats_t &workers_stats, const memory_resource::MemoryStats &memory_stats, double cpu_s_usage, double cpu_u_usage, + long long int instance_cache_memory_swaps_ok, long long int instance_cache_memory_swaps_fail); + private: - int port = 0; - int sock_fd = 0; - std::string host; + explicit StatsHouseClient(const std::string &ip, int port); + + void add_job_workers_shared_memory_stats(const char *cluster_name, const job_workers::JobStats &job_stats); - StatsHouseClient(); - ~StatsHouseClient(); + size_t add_job_workers_shared_messages_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, + size_t buffer_size); - bool init_connection(); + size_t add_job_workers_shared_memory_buffers_stats(const char *cluster_name, const job_workers::JobStats::MemoryBufferStats &memory_buffers_stats, + const char *size_tag, size_t buffer_size); - friend class vk::singleton; + static StatsHouseClient *inner; + statshouse::TransportUDP transport; }; diff --git a/server/statshouse/worker-stats-buffer.cpp b/server/statshouse/worker-stats-buffer.cpp deleted file mode 100644 index 07c75e308c..0000000000 --- a/server/statshouse/worker-stats-buffer.cpp +++ /dev/null @@ -1,132 +0,0 @@ -// Compiler for PHP (aka KPHP) -// Copyright (c) 2022 LLC «V Kontakte» -// Distributed under the GPL v3 License, see LICENSE.notice.txt - -#include "worker-stats-buffer.h" -#include "common/resolver.h" -#include "common/tl/constants/statshouse.h" -#include "runtime/critical_section.h" -#include "server/server-config.h" -#include "statshouse-client.h" - -namespace statshouse { - -constexpr size_t buffer_size = 1 << 20; -static char buffer[buffer_size]; - -bool StatsBuffer::is_need_to_flush() { - return size >= data.size(); -} - -bool StatsBuffer::empty() const { - return size == 0; -} - -void StatsBuffer::add_stat(double value) { - data[size++] = value; -} - -std::vector StatsBuffer::get_data_and_reset_buffer() { - std::vector result(data.begin(), data.begin() + size); - size = 0; - return result; -} - -WorkerStatsBuffer::WorkerStatsBuffer() : last_send_time(std::chrono::steady_clock::now()) { - -} - -void WorkerStatsBuffer::add_query_stat(GenericQueryStatKey key, WorkerType worker_type, double value) { - if (!enabled) { - return; - } - auto &worker_type_buffer = generic_query_stats[static_cast(worker_type)]; - auto &stats_buffer = worker_type_buffer[static_cast(key)]; - - if (stats_buffer.is_need_to_flush()) { - flush(); - } - stats_buffer.add_stat(value); -} - -void WorkerStatsBuffer::add_query_stat(QueryStatKey key, double value) { - if (!enabled) { - return; - } - auto &stats_buffer = query_stats[static_cast(key)]; - - if (stats_buffer.is_need_to_flush()) { - flush(); - } - stats_buffer.add_stat(value); -} - -void WorkerStatsBuffer::make_generic_metric(std::vector &metrics, const char *name, GenericQueryStatKey stat_key, size_t worker_type, - const std::vector &tags) { - auto &stats_buffer = generic_query_stats[worker_type][static_cast(stat_key)]; - if (!stats_buffer.empty()) { - metrics.push_back(make_statshouse_value_metrics(name, stats_buffer.get_data_and_reset_buffer(), tags)); - } -} - -void WorkerStatsBuffer::make_metric(std::vector &metrics, const char *name, QueryStatKey stat_key, const std::vector &tags) { - auto &stats_buffer = query_stats[static_cast(stat_key)]; - if (!stats_buffer.empty()) { - metrics.push_back(make_statshouse_value_metrics(name, stats_buffer.get_data_and_reset_buffer(), tags)); - } -} - -void WorkerStatsBuffer::flush() { - dl::CriticalSectionGuard critical_section; - std::vector metrics; - const char *cluster_name = vk::singleton::get().get_cluster_name(); - - for (size_t i = 0; i < static_cast(WorkerType::types_count); ++i) { - std::vector tags; - tags.emplace_back("cluster", cluster_name); - tags.emplace_back("worker_type", i == static_cast(WorkerType::general_worker) ? "general" : "job"); - - make_generic_metric(metrics, "kphp_requests_outgoing_queries", GenericQueryStatKey::outgoing_queries, i, tags); - make_generic_metric(metrics, "kphp_requests_outgoing_long_queries", GenericQueryStatKey::outgoing_long_queries, i, tags); - make_generic_metric(metrics, "kphp_requests_script_time", GenericQueryStatKey::script_time, i, tags); - make_generic_metric(metrics, "kphp_requests_net_time", GenericQueryStatKey::net_time, i, tags); - - make_generic_metric(metrics, "kphp_memory_script_usage", GenericQueryStatKey::memory_used, i, tags); - make_generic_metric(metrics, "kphp_memory_script_real_usage", GenericQueryStatKey::real_memory_used, i, tags); - } - - std::vector tags; - tags.emplace_back("cluster_name", cluster_name); - - make_metric(metrics, "kphp_jobs_queue_time", QueryStatKey::job_wait_time, tags); - make_metric(metrics, "kphp_memory_job_request_usage", QueryStatKey::job_request_memory_usage, tags); - make_metric(metrics, "kphp_memory_job_request_real_usage", QueryStatKey::job_request_real_memory_usage, tags); - make_metric(metrics, "kphp_memory_job_response_usage", QueryStatKey::job_response_memory_usage, tags); - make_metric(metrics, "kphp_memory_job_response_real_usage", QueryStatKey::job_response_real_memory_usage, tags); - - make_metric(metrics, "kphp_memory_job_common_request_usage", QueryStatKey::job_common_request_memory_usage, tags); - make_metric(metrics, "kphp_memory_job_common_request_real_usage", QueryStatKey::job_common_request_real_memory_usage, tags); - - if (metrics.empty()) { - last_send_time = std::chrono::steady_clock::now(); - return; - } - - int offset = vk::tl::store_to_buffer(buffer, buffer_size, TL_STATSHOUSE_ADD_METRICS_BATCH); - offset = offset + vk::tl::store_to_buffer(buffer + offset, buffer_size - offset, static_cast(vk::tl::statshouse::add_metrics_batch_fields_mask::ALL)); - int len = vk::tl::store_to_buffer(buffer + offset, buffer_size - offset, metrics); - vk::singleton::get().send_metrics(buffer, offset + len); - last_send_time = std::chrono::steady_clock::now(); -} - -void WorkerStatsBuffer::flush_if_needed() { - const auto now_tp = std::chrono::steady_clock::now(); - if (enabled && (now_tp - last_send_time >= std::chrono::seconds{1})) { - vk::singleton::get().flush(); - } -} -void WorkerStatsBuffer::enable() { - enabled = true; -} - -} // namespace statshouse diff --git a/server/statshouse/worker-stats-buffer.h b/server/statshouse/worker-stats-buffer.h deleted file mode 100644 index 970c1c1b35..0000000000 --- a/server/statshouse/worker-stats-buffer.h +++ /dev/null @@ -1,77 +0,0 @@ -// Compiler for PHP (aka KPHP) -// Copyright (c) 2022 LLC «V Kontakte» -// Distributed under the GPL v3 License, see LICENSE.notice.txt - -#pragma once - -#include -#include -#include - -#include "add-metrics-batch.h" -#include "common/mixin/not_copyable.h" -#include "common/tl/methods/string.h" -#include "server/workers-control.h" - -namespace statshouse { - -enum class GenericQueryStatKey { - outgoing_queries, - outgoing_long_queries, - script_time, - net_time, - - memory_used, - real_memory_used, - total_allocated_by_curl, - - types_count -}; - -enum class QueryStatKey { - job_wait_time, - job_request_memory_usage, - job_request_real_memory_usage, - job_response_memory_usage, - job_response_real_memory_usage, - - job_common_request_memory_usage, - job_common_request_real_memory_usage, - - types_count -}; - -class StatsBuffer : vk::not_copyable { -public: - bool is_need_to_flush(); - bool empty() const; - void add_stat(double value); - std::vector get_data_and_reset_buffer(); - -private: - std::array data; - size_t size{0}; -}; - -class WorkerStatsBuffer : vk::not_copyable { -public: - WorkerStatsBuffer(); - void add_query_stat(GenericQueryStatKey key, WorkerType worker_type, double value); - void add_query_stat(QueryStatKey key, double value); - void flush_if_needed(); - void enable(); - using tag = std::pair; - -private: - void flush(); - void make_generic_metric(std::vector &metrics, const char *name, GenericQueryStatKey stat_key, size_t worker_type, - const std::vector &tags); - void make_metric(std::vector &metrics, const char *name, QueryStatKey stat_key, const std::vector &tags); - - std::array(GenericQueryStatKey::types_count)>, static_cast(WorkerType::types_count)> generic_query_stats; - std::array(QueryStatKey::types_count)> query_stats; - std::chrono::steady_clock::time_point last_send_time; - bool enabled = false; -}; - -} // namespace statshouse diff --git a/server/workers-stats.h b/server/workers-stats.h new file mode 100644 index 0000000000..c4c79f0505 --- /dev/null +++ b/server/workers-stats.h @@ -0,0 +1,15 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2023 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +struct workers_stats_t { + long tot_workers_started{0}; + long tot_workers_dead{0}; + long tot_workers_strange_dead{0}; + long workers_killed{0}; + long workers_hung{0}; + long workers_terminated{0}; + long workers_failed{0}; +}; diff --git a/tests/python/lib/engine.py b/tests/python/lib/engine.py index 9df7a1c72a..d5586d047b 100644 --- a/tests/python/lib/engine.py +++ b/tests/python/lib/engine.py @@ -9,7 +9,7 @@ import json from .colors import cyan -from .stats_receiver import StatsReceiver +from .stats_receiver import StatsReceiver, StatsType from .port_generator import get_port from .tl_client import send_rpc_request @@ -28,7 +28,7 @@ def __init__(self, engine_bin, working_dir, options=None): self._working_dir = working_dir self._engine_name = os.path.basename(engine_bin).replace('-', '_') self._log_file = os.path.join(working_dir, self._engine_name + ".log") - self._stats_receiver = StatsReceiver(self._engine_name, working_dir) + self._stats_receiver = StatsReceiver(self._engine_name, working_dir, StatsType.STATSD) self._rpc_port = get_port() self._options = { "--log": self._log_file, @@ -109,7 +109,7 @@ def create_binlog(self): def start(self, start_msgs=None): """ - Запустить дижек + Запустить движок :param start_msgs: Сообщение в логе, которое нужно проверить после запуска движка """ self._stats_receiver.start() @@ -155,7 +155,7 @@ def start(self, start_msgs=None): def stop(self): """ - Остановить движек и проверить, что все в порядке + Остановить движок и проверить, что все в порядке """ if self._engine_process is None or not self._engine_process.is_running(): return diff --git a/tests/python/lib/stats_receiver.py b/tests/python/lib/stats_receiver.py index 0e5edf3e40..f1acc3ff25 100644 --- a/tests/python/lib/stats_receiver.py +++ b/tests/python/lib/stats_receiver.py @@ -4,21 +4,28 @@ import time import re from sys import platform +from enum import Enum import psutil from .port_generator import get_port +class StatsType(Enum): + STATSD = 0 + STATSHOUSE = 1 + + class StatsReceiver: - def __init__(self, engine_name, working_dir): + def __init__(self, engine_name, working_dir, stats_type): self._working_dir = working_dir self._port = get_port() self._stats_proc = None - self._stats_file = os.path.join(working_dir, engine_name + ".stats") + self._stats_type = stats_type + self._stats_file = os.path.join(working_dir, engine_name + "." + stats_type.name.lower()) self._stats_file_write_fd = None self._stats_file_read_fd = None - self._stats = {} + self._stats = {} if stats_type == StatsType.STATSD else "" @property def port(self): @@ -31,9 +38,11 @@ def stats(self): def start(self): print("\nStarting stats receiver on port {}".format(self._port)) self._stats_file_write_fd = open(self._stats_file, 'wb') - self._stats_file_read_fd = open(self._stats_file, 'r') + self._stats_file_read_fd = open(self._stats_file, 'r', + errors="replace" if self._stats_type == StatsType.STATSHOUSE else "strict") self._stats_proc = psutil.Popen( - ["nc", "-l", "" if platform == "darwin" else "-p", str(self._port)], + ["nc", "-l{}".format("u" if self._stats_type == StatsType.STATSHOUSE else ""), + "" if platform == "darwin" else "-p", str(self._port)], stdout=self._stats_file_write_fd, stderr=subprocess.STDOUT, cwd=self._working_dir @@ -42,7 +51,7 @@ def start(self): self._stats_file_write_fd.close() possible_error = self._stats_file_read_fd.read() or "empty out" self._stats_file_read_fd.close() - RuntimeError("Can't start stats receiver: " + possible_error) + raise RuntimeError("Can't start stats receiver: " + possible_error) def stop(self): if self._stats_proc is None or not self._stats_proc.is_running(): @@ -62,6 +71,12 @@ def wait_next_stats(self, timeout=60): time.sleep(0.05) def try_update_stats(self): + if self._stats_type == StatsType.STATSD: + return self._try_update_stats_statsd() + elif self._stats_type == StatsType.STATSHOUSE: + return self._try_update_stats_statshouse() + + def _try_update_stats_statsd(self): new_stats = {} lines = self._stats_file_read_fd.readlines() for stat_line in filter(None, lines): @@ -81,3 +96,8 @@ def try_update_stats(self): # HACK: replace prefix for kphp server stats self._stats = {re.sub("^kphp_stats\\..+\\.", "kphp_server.", k): v for k, v in new_stats.items()} return True + + def _try_update_stats_statshouse(self): + added_stats = self._stats_file_read_fd.read() + self._stats += added_stats + return len(added_stats) > 0 diff --git a/tests/python/tests/stats/test_statshouse_smoke.py b/tests/python/tests/stats/test_statshouse_smoke.py new file mode 100644 index 0000000000..489f7af59b --- /dev/null +++ b/tests/python/tests/stats/test_statshouse_smoke.py @@ -0,0 +1,41 @@ +import time + +from python.lib.testcase import KphpServerAutoTestCase +from python.lib.stats_receiver import StatsReceiver, StatsType + + +class TestStatshouseSmoke(KphpServerAutoTestCase): + WORKERS_NUM = 2 + + @classmethod + def extra_class_setup(cls): + cls.statshouse = StatsReceiver("kphp_server", cls.kphp_server_working_dir, StatsType.STATSHOUSE) + cls.statshouse.start() + cls.kphp_server.update_options({ + "--workers-num": cls.WORKERS_NUM, + "--statshouse-client": ":" + str(cls.statshouse.port), + }) + + @classmethod + def extra_class_teardown(cls): + cls.statshouse.stop() + + def _send_request(self): + resp = self.kphp_server.http_get() + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.text, "Hello world!") + + def _assert_statshouse_stats(self, needle, offset=0, timeout=5): + start = time.time() + while self.statshouse.stats.find(needle, offset) == -1: + if time.time() - start > timeout: + raise RuntimeError("Can't find string {} in StatsHouse stats with offset={}".format(needle, offset)) + self.statshouse.try_update_stats() + time.sleep(0.05) + + def test_statshouse_smoke(self): + self._assert_statshouse_stats("kphp_server_workers") + for _ in range(5): + offset = len(self.statshouse.stats) + self._send_request() + self._assert_statshouse_stats("kphp_request_time", offset) diff --git a/third-party/statshouse.h b/third-party/statshouse.h new file mode 100644 index 0000000000..83acec13e1 --- /dev/null +++ b/third-party/statshouse.h @@ -0,0 +1,1180 @@ +/* Copyright 2022 V Kontakte LLC +* +* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at https://mozilla.org/MPL/2.0/. +*/ + +#pragma once + +// Header-only no dependency c++11 compatible implementation of statshouse UDP transport +// tested on linux for now. Implementations for incompatible platforms should use simple #ifdefs around network code +// packing of integers is platform-independent, packing of double may need #ifdef in doubleBits function + +// should compile without warnings with -Wno-noexcept-type -g -Wall -Wextra -Werror=return-type + +#define STATSHOUSE_TRANSPORT_VERSION "2023-06-14" +#define STATSHOUSE_USAGE_METRICS "statshouse_transport_metrics" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Use #ifdefs to include headers for various platforms here +#include +#include +#include + +#define STATSHOUSE_UNLIKELY(x) __builtin_expect((x), 0) // could improve packing performance on your platform. Set to (x) to disable +#define STATSHOUSE_LIKELY(x) __builtin_expect((x), 1) + +namespace statshouse { + +#if __cplusplus >= 201703L + +using string_view = std::string_view; + +#else + +class string_view { + const char * d = nullptr; + size_t s = 0; +public: + string_view() = default; + string_view(const char * str):d(str), s(std::strlen(str)) {} + string_view(const std::string & str):d(str.data()), s(str.size()) {} + string_view(const char * d, size_t s):d(d), s(s) {} + + const char * data() const { return d; } + size_t size() const { return s; } + operator std::string()const { return std::string{d, s}; } +}; + +#endif + +namespace wyhash { +// https://github.com/wangyi-fudan/wyhash + +//128bit multiply function +static inline void _wymum(uint64_t *A, uint64_t *B){ +#if defined(__SIZEOF_INT128__) + __uint128_t r=*A; r*=*B; + *A=(uint64_t)r; *B=(uint64_t)(r>>64); +#elif defined(_MSC_VER) && defined(_M_X64) + *A=_umul128(*A,*B,B); +#else + uint64_t ha=*A>>32, hb=*B>>32, la=(uint32_t)*A, lb=(uint32_t)*B, hi, lo; + uint64_t rh=ha*hb, rm0=ha*lb, rm1=hb*la, rl=la*lb, t=rl+(rm0<<32), c=t>32)+(rm1>>32)+c; + *A=lo; *B=hi; +#endif +} + +//multiply and xor mix function, aka MUM +static inline uint64_t _wymix(uint64_t A, uint64_t B){ _wymum(&A,&B); return A^B; } + +//read functions +static inline uint64_t _wyr8(const uint8_t *p) { uint64_t v; memcpy(&v, p, 8); return v;} +static inline uint64_t _wyr4(const uint8_t *p) { uint32_t v; memcpy(&v, p, 4); return v;} +static inline uint64_t _wyr3(const uint8_t *p, size_t k) { return (((uint64_t)p[0])<<16)|(((uint64_t)p[k>>1])<<8)|p[k-1];} + +//wyhash main function +static inline uint64_t wyhash(const void *key, size_t len, uint64_t seed, const uint64_t *secret){ + const uint8_t *p=(const uint8_t *)key; seed^=_wymix(seed^secret[0],secret[1]); uint64_t a, b; + if(STATSHOUSE_LIKELY(len<=16)){ + if(STATSHOUSE_LIKELY(len>=4)){ a=(_wyr4(p)<<32)|_wyr4(p+((len>>3)<<2)); b=(_wyr4(p+len-4)<<32)|_wyr4(p+len-4-((len>>3)<<2)); } + else if(STATSHOUSE_LIKELY(len>0)){ a=_wyr3(p,len); b=0;} + else a=b=0; + } + else{ + size_t i=len; + if(STATSHOUSE_UNLIKELY(i>48)){ + uint64_t see1=seed, see2=seed; + do{ + seed=_wymix(_wyr8(p)^secret[1],_wyr8(p+8)^seed); + see1=_wymix(_wyr8(p+16)^secret[2],_wyr8(p+24)^see1); + see2=_wymix(_wyr8(p+32)^secret[3],_wyr8(p+40)^see2); + p+=48; i-=48; + }while(STATSHOUSE_UNLIKELY(i>48)); + seed^=see1^see2; + } + while(STATSHOUSE_UNLIKELY(i>16)){ seed=_wymix(_wyr8(p)^secret[1],_wyr8(p+8)^seed); i-=16; p+=16; } + a=_wyr8(p+i-16); b=_wyr8(p+i-8); + } + a^=secret[1]; b^=seed; _wymum(&a,&b); + return _wymix(a^secret[0]^len,b^secret[1]); +} + +//the default secret parameters +static const uint64_t _wyp[4] = {0xa0761d6478bd642full, 0xe7037ed1a0b428dbull, 0x8ebc6af09c88c6e3ull, 0x589965cc75374cc3ull}; + +} // namespace wyhash + +struct nop_mutex { // will select lock policy later + nop_mutex() noexcept = default; + ~nop_mutex() = default; + + void lock() {} + void unlock() {} +}; + +namespace test { template struct traits; } + +class TransportUDPBase { + friend class Registry; + template friend struct test::traits; +public: + using mutex = nop_mutex; // for now use in experiments + enum { + DEFAULT_PORT = 13337, + SAFE_DATAGRAM_SIZE = 508, // https://stackoverflow.com/questions/1098897/what-is-the-largest-safe-udp-packet-size-on-the-internet + DEFAULT_DATAGRAM_SIZE = 1232, + MAX_DATAGRAM_SIZE = 65507, // https://stackoverflow.com/questions/42609561/udp-maximum-packet-size/42610200 + MAX_KEYS = 16, + MAX_FULL_KEY_SIZE = 1024, // roughly metric plus all tags + }; + + // Metric name builder. Use by calling transport.metric("metric").tag("tag1").tag("tag2").write_count(1); + // Contains reference to transport and must outlive it + class MetricBuilder { + public: + // Assigns next tag value, starting from 1 + MetricBuilder & tag(string_view str) { + return tag_id(next_tag++, str); + } + // Sets env (tag 0). If not set, transport's default_env will be used. + MetricBuilder & env(string_view str) { + env_set = true; + return tag_id(0, str); + } + // Assigns arbitrary tag value by key name + MetricBuilder & tag(string_view key, string_view str) { + auto begin = buffer + buffer_pos; + auto end = buffer + MAX_FULL_KEY_SIZE; + if (STATSHOUSE_UNLIKELY(!(begin = pack_string(begin, end, key.data(), key.size())))) { did_not_fit = true; return *this; } + if (STATSHOUSE_UNLIKELY(!(begin = pack_string(begin, end, str.data(), str.size())))) { did_not_fit = true; return *this; } + buffer_pos = begin - buffer; + tags_count++; + return *this; + } + + // for write_count. if writing with sample factor, set count to # of events before sampling + bool write_count(double count, uint32_t tsUnixSec = 0) const { + std::lock_guard lo(transport->mu); + return transport->write_count_impl(*this, count, tsUnixSec); + } + // for write_values. set count to # of events before sampling, values to sample of original values + // if no sampling is performed, pass 0 (interpreted as values_count) to count + bool write_values(const double *values, size_t values_count, double count = 0, uint32_t tsUnixSec = 0) const { + std::lock_guard lo(transport->mu); + return transport->write_values_impl(*this, values, values_count, count, tsUnixSec); + } + bool write_value(double value, uint32_t tsUnixSec = 0) const { + return write_values(&value, 1, 0, tsUnixSec); + } + // for write_unique, set count to # of events before sampling, values to sample of original hashes + // for example, if you recorded events [1,1,1,1,2], you could pass them as is or as [1, 2] into 'values' and 5 into 'count'. + bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t tsUnixSec = 0) const { + std::lock_guard lo(transport->mu); + return transport->write_unique_impl(*this, values, values_count, count, tsUnixSec); + } + bool write_unique(uint64_t value, uint32_t tsUnixSec = 0) const { + return write_unique(&value, 1, 1, tsUnixSec); + } + private: + string_view metric_name() const { return {buffer + 1, metric_name_len}; } // see pack_string + string_view full_key_name() const { return {buffer, buffer_pos}; } + size_t tags_count_pos()const { return (1 + metric_name_len + 3) & ~3; } // see pack_string + + MetricBuilder & tag_id(size_t i, string_view str) { + auto begin = buffer + buffer_pos; + auto end = buffer + MAX_FULL_KEY_SIZE; + if (STATSHOUSE_UNLIKELY(!(begin = pack32(begin, end, get_tag_name_tl(i))))) { did_not_fit = true; return *this; } + if (STATSHOUSE_UNLIKELY(!(begin = pack_string(begin, end, str.data(), str.size())))) { did_not_fit = true; return *this; } + buffer_pos = begin - buffer; + tags_count++; + return *this; + } + + friend class Registry; + friend class TransportUDPBase; + explicit MetricBuilder(TransportUDPBase *transport, string_view m):transport(transport) { + // to simplify layout in buffer, we ensure metric name is in short format + m = shorten(m); + metric_name_len = m.size(); + static_assert(MAX_FULL_KEY_SIZE > 4 + TL_MAX_TINY_STRING_LEN + 4, "not enough space to not overflow in constructor"); + auto begin = buffer + buffer_pos; + auto end = buffer + MAX_FULL_KEY_SIZE; + begin = pack_string(begin, end, m.data(), m.size()); + begin = pack32(begin, end, 0); // place for tags_count. Never updated in buffer and always stays 0, + buffer_pos = begin - buffer; + } + TransportUDPBase *transport; + bool env_set = false; // so current default_env will be added in pack_header even if set later. + bool did_not_fit = false; // we do not want any exceptions in writing metrics + size_t next_tag = 1; + size_t tags_count = 0; + size_t metric_name_len = 0; + size_t buffer_pos = 0; // not ptr because we want safe copy + char buffer[MAX_FULL_KEY_SIZE]; // Uninitialized, due to performance considerations. + }; + + explicit TransportUDPBase() = default; + virtual ~TransportUDPBase() = default; + + void set_default_env(const std::string & env) { // automatically sent as tag '0' + std::lock_guard lo(mu); + default_env = env; + if (default_env.empty()) { + default_env_tl_pair.clear(); + return; + } + + default_env_tl_pair.resize(4 + default_env.size() + 4); // key name, env, padding + auto begin = &default_env_tl_pair[0]; + auto end = begin + default_env_tl_pair.size(); + + begin = pack32(begin, end, pack_key_name(0)); + begin = pack_string(begin, end, default_env.data(), default_env.size()); + + default_env_tl_pair.resize(begin - &default_env_tl_pair[0]); + } + std::string get_default_env()const { + std::lock_guard lo(mu); + return default_env; + } + + MetricBuilder metric(string_view name) { return MetricBuilder(this, name); } + + // flush() is performed automatically every time write_* is called, but in most services + // there might be extended period, when no metrics are written, so it is recommended to + // call flush() 1-10 times per second so no metric will be stuck in a buffer for a long time. + void flush(bool force = false) { + std::lock_guard lo(mu); + auto now = time_now(); + if (force) { + flush_impl(now); + } else { + maybe_flush(now); + } + } + + // if you call this function, transport will stop calling std::time() function forever (so will be slightly faster), + // but you promise to call set_external_time forever every time clock second changes. + // You can call it as often as you like, for example on each event loop invocation before calling user code, + // so every event is marked by exact timestamp it happened. + // If you decide to call set_external_time, do not call flush() + void set_external_time(uint32_t timestamp) { + std::lock_guard lo(mu); // not perfect, make packet_ts atomic + if (STATSHOUSE_UNLIKELY(tick_external && timestamp == packet_ts)) { return; } + tick_external = true; + flush_impl(timestamp); + } + + // max packet size can be changed at any point in instance lifetime + // by default on localhost packet_size will be set to MAX_DATAGRAM_SIZE, otherwise to DEFAULT_DATAGRAM_SIZE + void set_max_udp_packet_size(size_t s) { + std::lock_guard lo(mu); + max_payload_size = std::max(SAFE_DATAGRAM_SIZE, std::min(s, MAX_DATAGRAM_SIZE)); + } + + // If set, will flush packet after each metric. Slow and unnecessary. Only for testing. + void set_immediate_flush(bool f) { + std::lock_guard lo(mu); + immediate_flush = f; + flush_impl(time_now()); + } + + struct Stats { + size_t metrics_sent = 0; + size_t metrics_overflow = 0; + size_t metrics_failed = 0; + size_t metrics_odd_kv = 0; + size_t metrics_too_big = 0; + size_t packets_sent = 0; + size_t packets_overflow = 0; + size_t packets_failed = 0; + size_t bytes_sent = 0; + std::string last_error; + }; + Stats get_stats()const { + std::lock_guard lo(mu); + Stats other = stats; + return other; + } + void clear_stats() { + std::lock_guard lo(mu); + stats = Stats{}; + } + + // writes and clears per metric counters to meta metric STATSHOUSE_USAGE_METRICS + // status "ok" is written always, error statuses only if corresponding counter != 0 + bool write_usage_metrics(string_view project, string_view cluster) { + std::lock_guard lo(mu); + auto result = true; + result = write_usage_metric_impl(project, cluster, "ok", &stats.metrics_sent , true ) && result; + result = write_usage_metric_impl(project, cluster, "err_sendto_would_block", &stats.metrics_overflow, false) && result; + result = write_usage_metric_impl(project, cluster, "err_sendto_other", &stats.metrics_failed , false) && result; + result = write_usage_metric_impl(project, cluster, "err_odd_kv", &stats.metrics_odd_kv , false) && result; + result = write_usage_metric_impl(project, cluster, "err_header_too_big", &stats.metrics_too_big , false) && result; + return result; + } + + static std::string version() { return STATSHOUSE_TRANSPORT_VERSION; } + +protected: + Stats stats; // allow implementations to update stats directly for simplicity +private: + enum { + TL_INT_SIZE = 4, + TL_LONG_SIZE = 8, + TL_DOUBLE_SIZE = 8, + TL_MAX_TINY_STRING_LEN = 253, + TL_BIG_STRING_LEN = 0xffffff, + TL_BIG_STRING_MARKER = 0xfe, + TL_STATSHOUSE_METRICS_BATCH_TAG = 0x56580239, + TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK = 1 << 0, + TL_STATSHOUSE_METRIC_TS_FIELDS_MASK = 1 << 4, + TL_STATSHOUSE_METRIC_VALUE_FIELDS_MASK = 1 << 1, + TL_STATSHOUSE_METRIC_UNIQUE_FIELDS_MASK = 1 << 2, + BATCH_HEADER_LEN = TL_INT_SIZE * 3 // TL tag, fields_mask, # of batches + }; + mutable mutex mu; + size_t batch_size = 0; // we fill packet header before sending + size_t packet_len = BATCH_HEADER_LEN; // reserve space for metric batch header + + std::string default_env; + std::string default_env_tl_pair; // "0", default_env in TL format to optimized writing + + size_t max_payload_size = DEFAULT_DATAGRAM_SIZE; + bool immediate_flush = false; + bool tick_external = false; + uint32_t packet_ts = 0; // also serves as external clock + + char packet[MAX_DATAGRAM_SIZE]{}; // zeroing is cheap, we are cautious + + uint32_t time_now() const { + return tick_external ? packet_ts : static_cast(std::time(nullptr)); + } + static void put32(char *buf, uint32_t val) { // optimized to mov by modern compiler + // std::memcpy(buf, &val, 4); // endian dependent, hence commented, code below is equally fast + buf[0] = char(val); + buf[1] = char(val >> 8); + buf[2] = char(val >> 16); + buf[3] = char(val >> 24); + } + static void put64(char *buf, uint64_t val) { // optimized to mov by modern compiler + // std::memcpy(buf, &val, 8); // endian dependent, hence commented, code below is equally fast + buf[0] = char(val); + buf[1] = char(val >> 8); + buf[2] = char(val >> 16); + buf[3] = char(val >> 24); + buf[4] = char(val >> 32); + buf[5] = char(val >> 40); + buf[6] = char(val >> 48); + buf[7] = char(val >> 56); + } + static bool enoughSpace(const char * begin, const char * end, size_t req) { return begin + req <= end; } + static char * pack32(char * begin, const char * end, size_t v) { + if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, 4))) { return nullptr; } + put32(begin, uint32_t(v)); + return begin + 4; + } + static char * pack64(char * begin, const char * end, uint64_t v) { + if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, 8))) { return nullptr; } + put64(begin, v); + return begin + 8; + } + static uint64_t doubleBits(double v) { + uint64_t v64 = 0; + static_assert( + sizeof(v) == sizeof v64, "Please make ifdef here with code writing IEEE 64-bit LE double for your (exotic?) platform"); + std::memcpy(&v64, &v, sizeof(v)); + return v64; + } + static string_view shorten(string_view x) { return string_view{x.data(), std::min(x.size(), TL_MAX_TINY_STRING_LEN)}; } + // Trim is VERY costly, 2x slowdown even when no actual trim performed. We do not want to punish good guys, and for bad guys + // we have 'err_header_too_big' usage meta metric + // static char * pack_string_trim(char * begin, const char * end, const char *str, size_t len) { + // while (STATSHOUSE_UNLIKELY(len > 0 && std::isspace(static_cast(*str)))) { + // ++str; + // --len; + // } + // return pack_string(begin, end, str, len); + // } + static constexpr uint32_t pack_key_name(size_t i) { + static_assert(MAX_KEYS <= 100, "key names up to 100 supported"); + return (i < 10) ? (1 | (uint32_t('0' + i) << 8)) : (2 | (uint32_t('0' + i/10) << 8) | (uint32_t('0' + i%10) << 16)); + } + static uint32_t get_tag_name_tl(size_t i) { + static constexpr std::array names{ + pack_key_name(0), + pack_key_name(1), + pack_key_name(2), + pack_key_name(3), + pack_key_name(4), + pack_key_name(5), + pack_key_name(6), + pack_key_name(7), + pack_key_name(8), + pack_key_name(9), + pack_key_name(10), + pack_key_name(11), + pack_key_name(12), + pack_key_name(13), + pack_key_name(14), + pack_key_name(15), + pack_key_name(16), + // we add extra key on purpose. statshouse will show receive error "tag 16 does not exist" if we overflow + }; + static_assert(names[MAX_KEYS] != 0, "please add key names to array when increasing MAX_KEYS"); + if (i > names.size()) + return 0; // if even more tags added, we do not care, so empty string is good enough. + return names[i]; + } + static char * pack_string(char * begin, const char * end, const char * str, size_t len) { + if (STATSHOUSE_UNLIKELY(len > TL_MAX_TINY_STRING_LEN)) { + if (STATSHOUSE_UNLIKELY(len > TL_BIG_STRING_LEN)) { + len = TL_BIG_STRING_LEN; + } + auto fullLen = (4 + len + 3) & ~3; + if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, fullLen))) { + return nullptr; + } + put32(begin + fullLen - 4, 0); // padding first + put32(begin, (len << 8U) | TL_BIG_STRING_MARKER); + std::memcpy(begin+4, str, len); + begin += fullLen; + } else { + auto fullLen = (1 + len + 3) & ~3; + if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, fullLen))) { + return nullptr; + } + put32(begin + fullLen - 4, 0); // padding first + *begin = static_cast(len); // or put32(p, len); + std::memcpy(begin+1, str, len); + begin += fullLen; + } + return begin; + } + char * pack_header(uint32_t now, size_t min_space, const MetricBuilder & metric, + double counter, uint32_t tsUnixSec, size_t fields_mask) { + if (STATSHOUSE_UNLIKELY(metric.did_not_fit)) { + stats.last_error.clear(); // prevent allocations + stats.last_error.append("statshouse::TransportUDP header too big for metric="); + stats.last_error.append(metric.metric_name()); + ++stats.metrics_too_big; + return nullptr; + } + if (tsUnixSec == 0) { + tsUnixSec = now; + } + char * begin = packet + packet_len; + const char * end = packet + max_payload_size; + if ((begin = pack_header_impl(begin, end, metric, counter, tsUnixSec, fields_mask)) && enoughSpace(begin, end, min_space)) { + return begin; + } + if (packet_len != BATCH_HEADER_LEN) { + flush_impl(now); + begin = packet + packet_len; + if ((begin = pack_header_impl(begin, end, metric, counter, tsUnixSec, fields_mask)) && enoughSpace(begin, end, min_space)) { + return begin; + } + } + stats.last_error.clear(); // prevent allocations + stats.last_error.append("statshouse::TransportUDP header too big for metric="); + stats.last_error.append(metric.metric_name()); + ++stats.metrics_too_big; + return nullptr; + } + char * pack_header_impl(char * begin, const char * end, const MetricBuilder & metric, + double counter, uint32_t tsUnixSec, size_t fields_mask) { + if (tsUnixSec != 0) { + fields_mask |= TL_STATSHOUSE_METRIC_TS_FIELDS_MASK; + } + if (STATSHOUSE_UNLIKELY(!(begin = pack32(begin, end, fields_mask)))) { return nullptr; } + if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, metric.buffer_pos))) { return nullptr; } + std::memcpy(begin, metric.buffer, metric.buffer_pos); + const bool add_env = !metric.env_set && !default_env_tl_pair.empty(); + put32(begin + metric.tags_count_pos(), metric.tags_count + int(add_env)); + begin += metric.buffer_pos; + if (add_env) { + if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, default_env_tl_pair.size()))) { return nullptr; } + std::memcpy(begin, default_env_tl_pair.data(), default_env_tl_pair.size()); + begin += default_env_tl_pair.size(); + } + if (fields_mask & TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK) { + if (STATSHOUSE_UNLIKELY(!(begin = pack64(begin, end, doubleBits(counter))))) { return nullptr;} + } + if (fields_mask & TL_STATSHOUSE_METRIC_TS_FIELDS_MASK) { + if (STATSHOUSE_UNLIKELY(!(begin = pack32(begin, end, tsUnixSec)))) { return nullptr; } + } + return begin; + } + bool write_count_impl(const MetricBuilder & metric, double count, uint32_t tsUnixSec) { + auto now = time_now(); + char * begin = pack_header(now, 0, metric, count, tsUnixSec, TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK); + if (!begin) { + return false; // did not fit into empty buffer + } + packet_len = begin - packet; + ++batch_size; + maybe_flush(now); + return true; + } + bool write_values_impl(const MetricBuilder & metric, const double *values, size_t values_count, double count, uint32_t tsUnixSec) { + size_t fields_mask = TL_STATSHOUSE_METRIC_VALUE_FIELDS_MASK; + if (count != 0 && count != double(values_count)) { + fields_mask |= TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK; + } + auto now = time_now(); + const char * end = packet + max_payload_size; + while (values_count != 0) { + char * begin = pack_header(now, TL_INT_SIZE + TL_DOUBLE_SIZE, metric, count, tsUnixSec, fields_mask); + if (!begin) { + return false; // did not fit into empty buffer + } + auto write_count = std::min(values_count, (end - begin - TL_INT_SIZE) / TL_DOUBLE_SIZE); // at least 1 + put32(begin, write_count); + begin += TL_INT_SIZE; + for (size_t j = 0; j != write_count; ++j) { + put64(begin+j*TL_DOUBLE_SIZE, doubleBits(values[j])); + } + values += write_count; + values_count -= write_count; + packet_len = begin + write_count*TL_DOUBLE_SIZE - packet; + ++batch_size; + } + maybe_flush(now); + return true; + } + bool write_unique_impl(const MetricBuilder & metric, const uint64_t *values, size_t values_count, double count, uint32_t tsUnixSec) { + size_t fields_mask = TL_STATSHOUSE_METRIC_UNIQUE_FIELDS_MASK; + if (count != 0 && count != double(values_count)) { + fields_mask |= TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK; + } + auto now = time_now(); + const char * end = packet + max_payload_size; + while (values_count != 0) { + char * begin = pack_header(now, TL_INT_SIZE + TL_LONG_SIZE, metric, count, tsUnixSec, fields_mask); + if (!begin) { + return false; // did not fit into empty buffer + } + auto write_count = std::min(values_count, (end - begin - TL_INT_SIZE) / TL_LONG_SIZE); // at least 1 + put32(begin, write_count); + begin += TL_INT_SIZE; + for (size_t j = 0; j != write_count; ++j) { + put64(begin+j*TL_LONG_SIZE, values[j]); + } + values += write_count; + values_count -= write_count; + packet_len = begin + write_count*TL_LONG_SIZE - packet; + ++batch_size; + } + maybe_flush(now); + return true; + } + bool write_usage_metric_impl(string_view project, string_view cluster, string_view status, size_t * value, bool send_if_0) { + if (*value || send_if_0) { + auto count = double(*value); + *value = 0; + write_count_impl(metric(STATSHOUSE_USAGE_METRICS) + .tag("status", status) + .tag("project" ,project) + .tag("cluster", cluster) + .tag("protocol", "udp") + .tag("language", "cpp") + .tag("version", STATSHOUSE_TRANSPORT_VERSION), count, 0); + } + return true; + } + void maybe_flush(uint32_t now) { + if (immediate_flush || now != packet_ts) { + flush_impl(now); + } + } + virtual bool on_flush_packet(const char * data, size_t size, size_t batch_size) = 0; + void flush_impl(uint32_t now) { + packet_ts = now; + if (batch_size == 0) { + return; + } + put32(packet, TL_STATSHOUSE_METRICS_BATCH_TAG); + put32(packet + TL_INT_SIZE, 0); // fields mask + put32(packet + 2*TL_INT_SIZE, uint32_t(batch_size)); // batch size + + auto result = on_flush_packet(packet, packet_len, batch_size); + if (result) { + ++stats.packets_sent; + stats.metrics_sent += batch_size; + stats.bytes_sent += packet_len; + } else { + ++stats.packets_failed; + stats.metrics_failed += batch_size; + } + // we will lose usage metrics if packet with usage is discarded, but we are ok with that + batch_size = 0; + packet_len = BATCH_HEADER_LEN; + } +}; + +// Use #ifdefs to customize for various platforms here +class TransportUDP : public TransportUDPBase { + template friend struct test::traits; +public: + // no functions of this class throw + // pass empty ip to use as dummy writer + // access last error and statistics with move_stats() + // all functions return false in case error happened, you can ignore them or write last error to log periodically + // in multithread environment use external lock to access this instance + TransportUDP():TransportUDP("127.0.0.1", DEFAULT_PORT) {} + TransportUDP(const std::string &ip, int port): dummy_instance(ip.empty() || port == 0) { + if (!dummy_instance) { + udp_socket = create_socket(ip, port); + } + } + TransportUDP(const TransportUDP &) = delete; + TransportUDP &operator=(const TransportUDP &) = delete; + ~TransportUDP() override { + flush(true); + (void)::close(udp_socket); + } + bool is_socket_valid() const { return udp_socket >= 0; } + + static int test_main() { // call from your main for testing + TransportUDP statshouse; + + statshouse.set_default_env("production"); + + statshouse.metric("toy" ).tag("android").tag("count").write_count(7); + std::vector values{1, 2, 3}; + statshouse.metric("toy" ).tag("android").tag("values").write_values(values.data(), values.size(), 6); + std::vector uniques{1, 2, 3}; + statshouse.metric("toy" ).tag("android").tag("uniques").env("staging").write_unique(uniques.data(), uniques.size(), 5, 1630000000); + + statshouse.metric("toy" ).tag("platform", "android").tag("2", "count_kv").write_count(1); + + statshouse.write_usage_metrics("test_main", "toy"); + return 0; + } + static size_t benchmark_pack_header(size_t total_size) { + TransportUDP tmp("", 0); + tmp.set_default_env("production"); + tmp.set_max_udp_packet_size(MAX_DATAGRAM_SIZE); + tmp.set_external_time(std::time(nullptr)); + + std::vector dynamic_tags; + while(dynamic_tags.size() < 1000000) { + dynamic_tags.push_back("tag3" + std::to_string(dynamic_tags.size())); + } + size_t next_tag_value = 0; + + while (tmp.stats.bytes_sent < total_size) { + // when all tags are static, this compiles into setting metric memory to fixed bytes corresponding to TL representation of strings + tmp.metric("typical_metric_name").tag("tag1_name").tag("tag2_name").tag(dynamic_tags[next_tag_value]).tag( + "tag4_name").write_count(1); + if (++next_tag_value >= dynamic_tags.size()) + next_tag_value = 0; + } + + return tmp.stats.metrics_sent; + } +private: + int udp_socket = -1; + bool dummy_instance = false; // better than separate class, because can be set depending on config + + bool on_flush_packet(const char * data, size_t size, size_t batch_size) override { + if (dummy_instance) { + return true; + } + if (!is_socket_valid()) { // we reported connection error after start + return false; + } + + ssize_t result = ::sendto(udp_socket, data, size, MSG_DONTWAIT, nullptr, 0); + + if (result >= 0) { + return true; + } + auto err = errno; + if (err == EAGAIN) { // || err == EWOULDBLOCK + ++stats.packets_overflow; + stats.metrics_overflow += batch_size; + return false; + } + set_errno_error(err, "statshouse::TransportUDP sendto() failed"); + return false; + } + + int create_socket(const std::string &ip, int port) { + if (port < 0 || port > 0xffff) { + stats.last_error = "statshouse::TransportUDP invalid port=" + std::to_string(port); + return -1; + } + ::sockaddr_storage addr = {}; + auto ap = reinterpret_cast(&addr); + auto ap6 = reinterpret_cast(ap); + auto ap4 = reinterpret_cast(ap); + int ap_len = 0; + if (inet_pton(AF_INET6, ip.c_str(), &ap6->sin6_addr) == 1) { + addr.ss_family = AF_INET6; + ap6->sin6_port = htons(uint16_t(port)); + ap_len = sizeof(*ap6); + // TODO - check if localhost and increase default packet size + } else if (inet_pton(AF_INET, ip.c_str(), &ap4->sin_addr) == 1) { + addr.ss_family = AF_INET; + ap4->sin_port = htons(uint16_t(port)); + ap_len = sizeof(*ap4); + char high_byte = 0; + std::memcpy(&high_byte, &ap4->sin_addr, 1); // this is correct, sin_addr in network byte order + if (high_byte == 0x7F) { + set_max_udp_packet_size(MAX_DATAGRAM_SIZE); + } + } else { + stats.last_error = "statshouse::TransportUDP could not parse ip=" + ip; + return -1; + } + auto sock = ::socket(addr.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (sock < 0) { + set_errno_error(errno, "statshouse::TransportUDP socket() failed"); + return -1; + } + if (::connect(sock, ap, ap_len) < 0) { + ::close(sock); + set_errno_error(errno, "statshouse::TransportUDP connect() failed"); + return -1; + } + return sock; + } + void set_errno_error(int err, const char *msg) { + stats.last_error.clear(); // prevent allocations + stats.last_error.append(msg); + stats.last_error.append(" errno="); + stats.last_error.append(std::to_string(err)); + stats.last_error.append(", "); + stats.last_error.append(strerror(err)); + } +}; + +class Registry { + template friend struct test::traits; + enum { + DEFAULT_MAX_BUCKET_SIZE = 1024, + EXPLICIT_FLUSH_CHUNK_SIZE = 10, + INCREMENTAL_FLUSH_SIZE = 2, + }; +public: + Registry() + : Registry("127.0.0.1", TransportUDP::DEFAULT_PORT) { + } + Registry(const std::string &host, int port) + : max_bucket_size{DEFAULT_MAX_BUCKET_SIZE} + , time_external{0} + , incremental_flush_disabled{false} + , transport{host, port} { + } + Registry(const Registry &other) = delete; + Registry(Registry &&other) = delete; + Registry &operator=(const Registry &other) = delete; + Registry &operator=(Registry &&other) = delete; + ~Registry() { + flush(true); + } +private: + struct multivalue_view { + explicit multivalue_view(double count) + : count{count} + , values_count{0} + , values{nullptr} + , unique{nullptr} { + } + multivalue_view(double count, size_t values_count, const double *values) + : count{std::max(count, static_cast(values_count))} + , values_count{values_count} + , values{values} + , unique{nullptr} { + } + multivalue_view(double count, size_t values_count, const uint64_t *unique) + : count{std::max(count, static_cast(values_count))} + , values_count{values_count} + , values{nullptr} + , unique{unique} { + } + size_t empty() const { + return count == 0 && !values_count; + } + double count; + size_t values_count; + const double *values; + const uint64_t *unique; + }; + struct multivalue { + void write(multivalue_view &src, size_t limit) { + if (src.values) { + write(values, src.values, src.values_count, src.count, limit); + } else if (src.unique) { + write(unique, src.unique, src.values_count, src.count, limit); + } else { + count += src.count; + src.count = 0; + } + } + template + void write(std::vector &dst, const T *src, size_t &src_size, double &src_count, size_t limit) { + if (!src_size || limit <= dst.size()) { + return; + } + // limit size & count + auto effective_size = std::min(src_size, limit - dst.size()); + auto effective_count = (effective_size / src_size) * src_count; + // update this + dst.insert(dst.end(), src, src + effective_size); + count += effective_count; + // update arguments + src_size -= effective_size; + src_count -= effective_count; + } + bool empty() const { + return count == 0 && values.empty() && unique.empty(); + } + double count{}; + std::vector values; + std::vector unique; + }; + struct bucket { + bucket(const TransportUDP::MetricBuilder &key, uint32_t timestamp) + : key{key} + , timestamp{timestamp} + , queue_ptr{nullptr} { + } + TransportUDP::MetricBuilder key; + uint32_t timestamp; + multivalue value; + std::mutex mu; + std::shared_ptr *queue_ptr; + }; +public: + class MetricRef { + public: + MetricRef(Registry *registry, const TransportUDP::MetricBuilder &key) + : registry{registry} + , ptr{registry->get_or_create_bucket(key)} { + } + bool write_count(double count, uint32_t timestamp = 0) const { + if (timestamp) { + std::lock_guard transport_lock{registry->transport_mu}; + return ptr->key.write_count(count, timestamp); + } + multivalue_view value{count}; + return registry->update_multivalue_by_ref(ptr, value); + } + bool write_value(double value, uint32_t timestamp = 0) const { + return write_values(&value, 1, 1, timestamp); + } + bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const { + if (timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { + std::lock_guard transport_lock{registry->transport_mu}; + return ptr->key.write_values(values, values_count, count, timestamp); + } + multivalue_view value{count, values_count, values}; + return registry->update_multivalue_by_ref(ptr, value); + } + bool write_unique(uint64_t value, uint32_t timestamp = 0) const { + return write_unique(&value, 1, 1, timestamp); + } + bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const { + if (timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { + std::lock_guard transport_lock{registry->transport_mu}; + return ptr->key.write_unique(values, values_count, count, timestamp); + } + multivalue_view value{count, values_count, values}; + return registry->update_multivalue_by_ref(ptr, value); + } + private: + Registry *registry; + std::shared_ptr ptr; + }; + class MetricBuilder { + public: + MetricBuilder(Registry *registry, string_view name) + : registry{registry} + , key{®istry->transport, name} { + } + MetricBuilder &tag(string_view str) { + key.tag(str); + return *this; + } + MetricBuilder &env(string_view str) { + key.env(str); + return *this; + } + MetricBuilder &tag(string_view a, string_view b) { + key.tag(a, b); + return *this; + } + MetricRef ref() const { + return MetricRef{registry, key}; + } + bool write_count(double count, uint32_t timestamp = 0) const { + if (timestamp) { + std::lock_guard transport_lock{registry->transport_mu}; + return key.write_count(count, timestamp); + } + multivalue_view value{count}; + return registry->update_multivalue_by_key(key, value); + } + bool write_value(double value, uint32_t timestamp = 0) const { + return write_values(&value, 1, 1, timestamp); + } + bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const { + if (timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { + std::lock_guard transport_lock{registry->transport_mu}; + return key.write_values(values, values_count, count, timestamp); + } + multivalue_view value{count, values_count, values}; + return registry->update_multivalue_by_key(key, value); + } + bool write_unique(uint64_t value, uint32_t timestamp = 0) const { + return write_unique(&value, 1, 1, timestamp); + } + bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const { + if (timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { + std::lock_guard transport_lock{registry->transport_mu}; + return key.write_unique(values, values_count, count, timestamp); + } + multivalue_view value{count, values_count, values}; + return registry->update_multivalue_by_key(key, value); + } + private: + Registry *registry; + TransportUDP::MetricBuilder key; + }; + MetricBuilder metric(string_view name) { + return {this, name}; + } + // If called once, then you need to call it until the end of the life of the registry. + void set_external_time(uint32_t timestamp) { + time_external = timestamp; + } + // Call once during initialization, if set then you are responsible for calling "flush". + void disable_incremental_flush() { + incremental_flush_disabled = true; + } + void flush(bool force = false) { + if (force) { + flush(time_now()); + } else if (incremental_flush_disabled) { + flush(time_now() - 1); + } else { + flush(time_now() - 2); + } + } + void set_max_udp_packet_size(size_t n) { + std::lock_guard transport_lock{transport_mu}; + transport.set_max_udp_packet_size(n); + } + void set_max_bucket_size(size_t n) { + max_bucket_size.store(n, std::memory_order_relaxed); + } + void set_default_env(const std::string &env) { + std::lock_guard transport_lock{transport_mu}; + transport.set_default_env(env); + } + class Clock { + public: + explicit Clock(Registry *registry) + : thread{start(registry)} { + } + ~Clock() { + stop = true; + thread.join(); + } + private: + std::thread start(Registry *registry) { + registry->set_external_time(std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count()); + return std::thread(&Clock::run, this, registry); + } + void run(Registry *registry) { + while (!stop) { + auto atime = std::chrono::time_point_cast(std::chrono::system_clock::now()) + std::chrono::seconds{1}; + std::this_thread::sleep_until(atime); + if (stop) { + break; + } + registry->set_external_time(atime.time_since_epoch().count()); + } + } + std::thread thread; + std::atomic_bool stop{}; + }; +private: + bool update_multivalue_by_key(const TransportUDP::MetricBuilder &key, multivalue_view &value) { + auto timestamp = time_now(); + return update_multivalue_by_ref(get_or_create_bucket(key, timestamp), timestamp, value); + } + bool update_multivalue_by_ref(const std::shared_ptr &ptr, multivalue_view &value) { + return update_multivalue_by_ref(ptr, time_now(), value); + } + bool update_multivalue_by_ref(const std::shared_ptr &ptr, uint32_t timestamp, multivalue_view &value) { + auto limit = max_bucket_size.load(std::memory_order_relaxed); + { + std::lock_guard bucket_lock{ptr->mu}; + if (timestamp <= ptr->timestamp) { + ptr->value.write(value, limit); + if (value.empty()) { + return true; // fast path + } + // protect against clock going backwards before falling through + timestamp = ptr->timestamp; + } else { + // next second + } + } + // split + size_t queue_size = 0; + { + std::lock_guard lock{mu}; + queue.emplace_back(std::move(*ptr->queue_ptr)); + *ptr->queue_ptr = alloc_bucket(ptr->key, ptr->timestamp); + { + std::lock_guard bucket_lock{ptr->mu}; + std::swap((*ptr->queue_ptr)->value, ptr->value); + ptr->timestamp = timestamp; + ptr->queue_ptr = &queue.back(); + ptr->value.write(value, limit); + // assert(value.empty()) + } + queue_size = queue.size(); + } + if (incremental_flush_disabled) { + return true; + } + // flush at most INCREMENTAL_FLUSH_SIZE buckets, don't flush bucket just added + std::array, INCREMENTAL_FLUSH_SIZE> buffer; + flush_some(buffer.data(), std::min(queue_size - 1, buffer.size())); + return true; + } + void flush(uint32_t timestamp) { + std::array, EXPLICIT_FLUSH_CHUNK_SIZE> buffer; + while (flush_some(buffer.data(), buffer.size(), timestamp)) { + // pass + } + } + bool flush_some(std::shared_ptr *buffer, size_t size, uint32_t timestamp = std::numeric_limits::max()) { + size_t count = 0; + { + std::lock_guard lock{mu}; + for (; count < size && !queue.empty() && queue.front()->timestamp <= timestamp;) { + auto &ptr = queue.front(); + auto keep = false; + { + std::lock_guard bucket_lock{ptr->mu}; + if (!ptr->value.empty()) { + buffer[count] = alloc_bucket(ptr->key, ptr->timestamp); + std::swap(buffer[count]->value, ptr->value); + ++count; + keep = true; + } + } + // pop front + if (keep || ptr.use_count() > 2) { + // either there are clients holding bucket reference + // or bucket was written recently, move bucket back + ptr->timestamp = time_now(); + queue.push_back(std::move(ptr)); + queue.back()->queue_ptr = &queue.back(); + } else { + // removing bucket with non-zero "queue_ptr" implies removal from dictionary + if (ptr->queue_ptr) { + ptr->queue_ptr = nullptr; + buckets.erase(string_view{ptr->key.buffer, ptr->key.buffer_pos}); + } + // unused bucket goes into freelist + freelist.emplace_back(std::move(ptr)); + } + queue.pop_front(); + } + } + for (size_t i = 0; i < count; ++i) { + auto &ptr = buffer[i]; + { + std::lock_guard transport_lock{transport_mu}; + auto &v = ptr->value; + if (!v.values.empty()) { + ptr->key.write_values(v.values.data(), v.values.size(), v.count, timestamp); + } else if (!v.unique.empty()) { + ptr->key.write_unique(v.unique.data(), v.unique.size(), v.count, timestamp); + } else if (v.count != 0) { + ptr->key.write_count(v.count, timestamp); + } + v.count = 0; + v.values.clear(); + v.unique.clear(); + } + std::lock_guard lock{mu}; + freelist.emplace_back(std::move(ptr)); + } + return count == size; + } + std::shared_ptr get_or_create_bucket(const TransportUDP::MetricBuilder &key, uint32_t timestamp = 0) { + std::lock_guard lock{mu}; + auto it = buckets.find(string_view{key.buffer, key.buffer_pos}); + if (it != buckets.end()) { + return it->second; + } + auto ptr = alloc_bucket(key, timestamp ? timestamp : time_now()); + queue.emplace_back(buckets.emplace(string_view{ptr->key.buffer, ptr->key.buffer_pos}, ptr).first->second); + ptr->queue_ptr = &queue.back(); + return ptr; + } + std::shared_ptr alloc_bucket(const TransportUDP::MetricBuilder &key, uint32_t timestamp) { + if (freelist.empty()) { + return std::make_shared(key, timestamp); + } + auto ptr = std::move(freelist.back()); + ptr->key = key; + ptr->timestamp = timestamp; + freelist.pop_back(); + return ptr; + } + uint32_t time_now() const { + uint32_t t{time_external}; + return t != 0 ? t : static_cast(std::time(nullptr)); + } + struct string_view_hash { + size_t operator()(string_view a) const { + return wyhash::wyhash(a.data(), a.size(), 0, wyhash::_wyp); + } + }; + struct string_view_equal { + bool operator()(string_view a, string_view b) const { + return a.size() == b.size() && + !std::memcmp(a.data(), b.data(), a.size()); + } + }; + std::mutex mu; + std::mutex transport_mu; + std::atomic max_bucket_size; + std::atomic time_external; + bool incremental_flush_disabled; + TransportUDP transport; + std::deque> queue; + std::vector> freelist; + std::unordered_map, string_view_hash, string_view_equal> buckets; +}; + +} // namespace statshouse + +#undef STATSHOUSE_UNLIKELY +#undef STATSHOUSE_USAGE_METRIC +#undef STATSHOUSE_TRANSPORT_VERSION