diff --git a/compiler/pipes/register-kphp-configuration.cpp b/compiler/pipes/register-kphp-configuration.cpp index e7ee4c9ef2..3afc8cbbe8 100644 --- a/compiler/pipes/register-kphp-configuration.cpp +++ b/compiler/pipes/register-kphp-configuration.cpp @@ -135,13 +135,11 @@ void RegisterKphpConfiguration::handle_constant_runtime_options(const ClassMembe register_confdata_blacklist(opt_pair->value()); } else if (*opt_key == confdata_predefined_wildcard_key_) { register_confdata_predefined_wildcard(opt_pair->value()); - } else if (*opt_key == mysql_db_name_key_) { - register_mysql_db_name(opt_pair->value()); } else if (*opt_key == net_dc_mask_key_) { register_net_dc_mask(opt_pair->value()); } else if (vk::any_of_equal(*opt_key, warmup_workers_part_key_, warmup_instance_cache_elements_part_key_, warmup_timeout_sec_key_, - oom_handling_memory_ratio_key_)) { + oom_handling_memory_ratio_key_, mysql_db_name_key_, job_workers_shared_memory_distribution_weights_)) { generic_register_simple_option(opt_pair->value(), *opt_key); } else { kphp_error(0, fmt_format("Got unexpected option {}::{}['{}']", @@ -186,10 +184,6 @@ void RegisterKphpConfiguration::register_confdata_predefined_wildcard(VertexPtr } } -void RegisterKphpConfiguration::register_mysql_db_name(VertexPtr value) const noexcept { - generic_register_simple_option(value, mysql_db_name_key_); -} - void RegisterKphpConfiguration::register_net_dc_mask(VertexPtr value) const noexcept { auto dc_masks = VertexUtil::get_actual_value(value).try_as(); kphp_error_return(dc_masks, fmt_format("{}[{}] must be a constexpr array", diff --git a/compiler/pipes/register-kphp-configuration.h b/compiler/pipes/register-kphp-configuration.h index 28a966ca2b..150c92d7da 100644 --- a/compiler/pipes/register-kphp-configuration.h +++ b/compiler/pipes/register-kphp-configuration.h @@ -20,7 +20,6 @@ class RegisterKphpConfiguration final : public SyncPipeF { void register_confdata_blacklist(VertexPtr value) const noexcept; void register_confdata_predefined_wildcard(VertexPtr value) const noexcept; - void register_mysql_db_name(VertexPtr value) const noexcept; void register_net_dc_mask(VertexPtr value) const noexcept; void handle_constant_function_palette(const ClassMemberConstant &c); @@ -43,6 +42,8 @@ class RegisterKphpConfiguration final : public SyncPipeF { const vk::string_view oom_handling_memory_ratio_key_{"--oom-handling-memory-ratio"}; + const vk::string_view job_workers_shared_memory_distribution_weights_{"--job-workers-shared-memory-distribution-weights"}; + public: void execute(FunctionPtr function, DataStream &unused_os) final; void on_finish(DataStream &os) override; diff --git a/runtime/memory_resource/extra-memory-pool.h b/runtime/memory_resource/extra-memory-pool.h index 7ea4129b90..b868291532 100644 --- a/runtime/memory_resource/extra-memory-pool.h +++ b/runtime/memory_resource/extra-memory-pool.h @@ -63,40 +63,10 @@ class extra_memory_raw_bucket : vk::not_copyable { return raw_memory_ + index * buffer_size_; } - static int get_bucket(const extra_memory_pool &pool) noexcept { - return __builtin_ctzll(pool.get_buffer_size()) - 20; - } - - static size_t get_size_by_bucket(size_t id) noexcept { - return 1 << (20 + id); - } - private: uint8_t *raw_memory_{nullptr}; size_t buffers_count_{0}; size_t buffer_size_{0}; }; -template -size_t distribute_memory(std::array &extra_memory, size_t initial_count, uint8_t *raw, size_t size) noexcept { - size_t left_memory = size; - for (size_t i = 0; i != extra_memory.size(); ++i) { - // (2^i) MB - const size_t buffer_size = extra_memory_raw_bucket::get_size_by_bucket(i); - size_t buffers_count = left_memory / buffer_size; - if (i + 1 != extra_memory.size()) { - // (initial_count / 2^i) - buffers_count = std::min(std::max(initial_count >> i, size_t{1}), buffers_count); - const size_t left_memory_for_next = left_memory - buffers_count * buffer_size; - const size_t next_buffer_size = buffer_size << 1; - buffers_count += left_memory_for_next % next_buffer_size >= buffer_size; - } - - left_memory -= buffers_count * buffer_size; - extra_memory[i].init(raw, buffers_count, buffer_size); - raw += buffers_count * buffer_size; - } - return left_memory; -} - } // namespace memory_resource diff --git a/server/job-workers/job-message.h b/server/job-workers/job-message.h index a864eafea7..fa6af2c11b 100644 --- a/server/job-workers/job-message.h +++ b/server/job-workers/job-message.h @@ -12,19 +12,24 @@ namespace job_workers { -// this constant is used for calculating total available messages count: -// messages count = the processes number * JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER -constexpr size_t JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER = 2; +constexpr size_t JOB_SHARED_MESSAGE_SIZE_EXP = 17; // the size of the job shared message (without extra memory) -constexpr size_t JOB_SHARED_MESSAGE_BYTES = 512 * 1024; // 512KB +constexpr size_t JOB_SHARED_MESSAGE_BYTES = 1 << JOB_SHARED_MESSAGE_SIZE_EXP; // 128KB // the number of buckets for extra shared memory, -// it is started from (2 * JOB_SHARED_MESSAGE_BYTES) Bytes and double for the next: -// 0 => 1MB, 1 => 2MB, 2 => 4MB, 3 => 8MB, 4 => 16MB, 5 => 32MB, 6 => 64MB -constexpr size_t JOB_EXTRA_MEMORY_BUFFER_BUCKETS = 7; +// 0 => 256KB, 1 => 512KB, 2 => 1MB, 3 => 2MB, 4 => 4MB, 5 => 8MB, 6 => 16MB, 7 => 32MB, 8 => 64MB +constexpr size_t JOB_EXTRA_MEMORY_BUFFER_BUCKETS = 9; // the default multiplier for getting shared memory limit for job workers messaging: // the default value for shared memory = the processes number * JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER constexpr size_t JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER = 8 * 1024 * 1024; // 8MB for 1 process +inline int get_extra_shared_memory_buffers_group_idx(size_t size) noexcept { + return __builtin_ctzll(size) - (JOB_SHARED_MESSAGE_SIZE_EXP + 1); +} + +inline size_t get_extra_shared_memory_buffer_size(size_t group_idx) noexcept { + return 1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 1 + group_idx); +} + struct JobSharedMemoryPiece; struct alignas(8) JobMetadata : vk::not_copyable { diff --git a/server/job-workers/job-stats.cpp b/server/job-workers/job-stats.cpp index 8fa95269ac..45885faa7a 100644 --- a/server/job-workers/job-stats.cpp +++ b/server/job-workers/job-stats.cpp @@ -43,6 +43,8 @@ void JobStats::write_stats_to(stats_t *stats) const noexcept { size_t currently_used = messages.write_stats_to(stats, "workers.job.memory.messages.shared_messages.", JOB_SHARED_MESSAGE_BYTES); constexpr std::array extra_memory_prefixes{ + "workers.job.memory.messages.extra_buffers.256kb.", + "workers.job.memory.messages.extra_buffers.512kb.", "workers.job.memory.messages.extra_buffers.1mb.", "workers.job.memory.messages.extra_buffers.2mb.", "workers.job.memory.messages.extra_buffers.4mb.", @@ -52,7 +54,7 @@ void JobStats::write_stats_to(stats_t *stats) const noexcept { "workers.job.memory.messages.extra_buffers.64mb.", }; for (size_t i = 0; i != JOB_EXTRA_MEMORY_BUFFER_BUCKETS; ++i) { - const size_t buffer_size = memory_resource::extra_memory_raw_bucket::get_size_by_bucket(i); + const size_t buffer_size = get_extra_shared_memory_buffer_size(i); currently_used += extra_memory[i].write_stats_to(stats, extra_memory_prefixes[i], buffer_size); } diff --git a/server/job-workers/job-worker-server.cpp b/server/job-workers/job-worker-server.cpp index 24192fb238..2eb780466c 100644 --- a/server/job-workers/job-worker-server.cpp +++ b/server/job-workers/job-worker-server.cpp @@ -262,8 +262,8 @@ bool JobWorkerServer::reply_is_expected() const noexcept { } void JobWorkerServer::flush_job_stat() noexcept { - vk::singleton::get().add_job_stats(job_stat.job_wait_time, job_stat.job_request_max_real_memory_used, job_stat.job_request_max_memory_used, - job_stat.job_response_max_real_memory_used, job_stat.job_response_max_memory_used); + vk::singleton::get().add_job_stats(job_stat.job_wait_time, job_stat.job_request_max_memory_used, job_stat.job_request_max_real_memory_used, + job_stat.job_response_max_memory_used, job_stat.job_response_max_real_memory_used); job_stat = {}; } } // namespace job_workers diff --git a/server/job-workers/shared-memory-manager.cpp b/server/job-workers/shared-memory-manager.cpp index 198e7197df..2896fa9f39 100644 --- a/server/job-workers/shared-memory-manager.cpp +++ b/server/job-workers/shared-memory-manager.cpp @@ -19,56 +19,52 @@ void SharedMemoryManager::init() noexcept { const size_t processes = vk::singleton::get().get_total_workers_count(); assert(processes); - if (!shared_messages_count_) { - auto mul = shared_messages_count_process_multiplier_ ? shared_messages_count_process_multiplier_ : JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER; - shared_messages_count_ = processes * mul; - } if (!memory_limit_) { auto mul = per_process_memory_limit_ ? per_process_memory_limit_ : JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER; memory_limit_ = processes * mul + sizeof(ControlBlock); } auto *raw_mem = static_cast(mmap_shared(memory_limit_)); - const size_t left_memory = memory_limit_ - sizeof(ControlBlock); - const uint32_t messages_count = std::min(shared_messages_count_, left_memory / sizeof(JobSharedMessage)); + const auto *raw_mem_start = raw_mem; + const auto *raw_mem_finish = raw_mem + memory_limit_; + control_block_ = new(raw_mem) ControlBlock{}; raw_mem += sizeof(ControlBlock); + const size_t left_memory = memory_limit_ - sizeof(ControlBlock); + + std::array shared_memory_group_buffers_counts; + control_block_->stats.unused_memory = calc_shared_memory_buffers_distribution(left_memory, shared_memory_group_buffers_counts); + + const uint32_t messages_count = shared_memory_group_buffers_counts[0]; + assert(messages_count > 0); for (uint32_t i = 0; i != messages_count; ++i) { freelist_put(&control_block_->free_messages, raw_mem); raw_mem += sizeof(JobSharedMessage); } - std::array extra_memory; - control_block_->stats.unused_memory = memory_resource::distribute_memory(extra_memory, shared_messages_count_ / 2, raw_mem, - left_memory - sizeof(JobSharedMessage) * messages_count); - for (size_t i = 0; i != extra_memory.size(); ++i) { - auto &free_extra_buffers = control_block_->free_extra_memory[i]; - auto &extra_buffers = extra_memory[i]; - for (size_t mem_index = 0; mem_index != extra_buffers.buffers_count(); ++mem_index) { - freelist_put(&free_extra_buffers, extra_buffers.get_extra_pool_raw(mem_index)); + for (size_t i = 1; i < shared_memory_buffers_groups_.size(); ++i) { + const auto &cur_g = shared_memory_buffers_groups_[i]; + const size_t cur_group_buffers_cnt = shared_memory_group_buffers_counts[i]; + for (int j = 0; j < cur_group_buffers_cnt; ++j) { + freelist_put(&control_block_->free_extra_memory[i - 1], raw_mem); + raw_mem += cur_g.buffer_size; } - control_block_->stats.extra_memory[i].count = extra_buffers.buffers_count(); + control_block_->stats.extra_memory[i - 1].count = cur_group_buffers_cnt; } + assert(raw_mem_start <= raw_mem && raw_mem <= raw_mem_finish); + control_block_->stats.memory_limit = memory_limit_; control_block_->stats.messages.count = messages_count; } bool SharedMemoryManager::set_memory_limit(size_t memory_limit) noexcept { - if (memory_limit < JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER * sizeof(JobSharedMessage) + sizeof(ControlBlock)) { + if (memory_limit < sizeof(JobSharedMessage) + sizeof(ControlBlock)) { return false; } memory_limit_ = memory_limit; return true; } -bool SharedMemoryManager::set_shared_messages_count(size_t shared_messages_count) noexcept { - if (shared_messages_count < JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER) { - return false; - } - shared_messages_count_ = shared_messages_count; - return true; -} - bool SharedMemoryManager::set_per_process_memory_limit(size_t per_process_memory_limit) noexcept { if (per_process_memory_limit < JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER) { return false; @@ -77,11 +73,13 @@ bool SharedMemoryManager::set_per_process_memory_limit(size_t per_process_memory return true; } -bool SharedMemoryManager::set_shared_messages_count_process_multiplier(size_t shared_messages_count_process_multiplier) noexcept { - if (shared_messages_count_process_multiplier < JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER) { +bool SharedMemoryManager::set_shared_memory_distribution_weights(const std::array &weights) noexcept { + if (weights[0] < 1e-6) { return false; } - shared_messages_count_process_multiplier_ = shared_messages_count_process_multiplier; + for (size_t i = 0; i < weights.size(); ++i) { + shared_memory_buffers_groups_[i].weight = weights[i]; + } return true; } @@ -97,7 +95,7 @@ void SharedMemoryManager::release_shared_message(JobMetadata *message) noexcept while (extra_memory->get_pool_payload_size()) { auto *releasing_extra_memory = extra_memory; extra_memory = extra_memory->next_in_chain; - const int i = memory_resource::extra_memory_raw_bucket::get_bucket(*releasing_extra_memory); + const int i = get_extra_shared_memory_buffers_group_idx(releasing_extra_memory->get_buffer_size()); assert(i >= 0 && i < control_block_->free_extra_memory.size()); freelist_put(&control_block_->free_extra_memory[i], releasing_extra_memory); ++control_block_->stats.extra_memory[i].released; @@ -134,7 +132,7 @@ void SharedMemoryManager::forcibly_release_all_attached_messages() noexcept { bool SharedMemoryManager::request_extra_memory_for_resource(memory_resource::unsynchronized_pool_resource &resource, size_t required_size) noexcept { assert(control_block_); for (size_t i = 0; i != control_block_->free_extra_memory.size(); ++i) { - const size_t buffer_real_size = memory_resource::extra_memory_raw_bucket::get_size_by_bucket(i); + const size_t buffer_real_size = get_extra_shared_memory_buffer_size(i); const size_t payload_size = memory_resource::extra_memory_pool::get_pool_payload_size(buffer_real_size); if (payload_size < required_size) { continue; @@ -157,4 +155,33 @@ JobStats &SharedMemoryManager::get_stats() noexcept { return control_block_->stats; } +size_t SharedMemoryManager::calc_shared_memory_buffers_distribution(size_t mem_size, std::array &group_buffers_counts) const noexcept { + auto calc_group_buffers_count = [&](const shared_memory_buffers_group_info &cur_g) -> size_t { + double sum = 0; + for (auto &g : shared_memory_buffers_groups_) { + sum += g.weight; + } + const double ratio = cur_g.weight / sum; + return static_cast(std::floor(mem_size * ratio / cur_g.buffer_size)); + }; + + size_t total_used_mem = 0; + for (size_t i = 0; i < shared_memory_buffers_groups_.size(); ++i) { + auto &g = shared_memory_buffers_groups_[i]; + const size_t buffers_cnt = calc_group_buffers_count(g); + group_buffers_counts[i] = buffers_cnt; + total_used_mem += g.buffer_size * buffers_cnt; + } + assert(total_used_mem <= mem_size); + // distribute left memory starting with the largest buffers + size_t unused_mem = mem_size - total_used_mem; + for (size_t i = shared_memory_buffers_groups_.size(); i-- > 0; ) { + auto &g = shared_memory_buffers_groups_[i]; + size_t cnt = unused_mem / g.buffer_size; + group_buffers_counts[i] += cnt; + unused_mem = unused_mem % g.buffer_size; + } + return unused_mem; +} + } // namespace job_workers diff --git a/server/job-workers/shared-memory-manager.h b/server/job-workers/shared-memory-manager.h index 7e9572ec5c..03603be804 100644 --- a/server/job-workers/shared-memory-manager.h +++ b/server/job-workers/shared-memory-manager.h @@ -81,9 +81,8 @@ class SharedMemoryManager : vk::not_copyable { void forcibly_release_all_attached_messages() noexcept; bool set_memory_limit(size_t memory_limit) noexcept; - bool set_shared_messages_count(size_t shared_messages_count) noexcept; bool set_per_process_memory_limit(size_t per_process_memory_limit) noexcept; - bool set_shared_messages_count_process_multiplier(size_t shared_messages_count_process_multiplier) noexcept; + bool set_shared_memory_distribution_weights(const std::array &weights) noexcept; bool request_extra_memory_for_resource(memory_resource::unsynchronized_pool_resource &resource, size_t required_size) noexcept; @@ -93,15 +92,36 @@ class SharedMemoryManager : vk::not_copyable { return control_block_; } + size_t calc_shared_memory_buffers_distribution(size_t mem_size, std::array &group_buffers_counts) const noexcept; + private: SharedMemoryManager() = default; friend class vk::singleton; size_t memory_limit_{0}; - size_t shared_messages_count_{0}; size_t per_process_memory_limit_{0}; - size_t shared_messages_count_process_multiplier_{0}; + // weights for distributing shared memory between buffers groups + // quantity of i-th memory piece is calculated like (w[i]/sum(w) * memory_limit_) / memory_piece_size + struct shared_memory_buffers_group_info { + size_t buffer_size; + double weight; + }; + std::array shared_memory_buffers_groups_{ + { + {1 << JOB_SHARED_MESSAGE_SIZE_EXP, 2}, // 128KB messages + // and extra buffers: + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 1), 2}, // 256KB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 2), 2}, // 512KB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 3), 2}, // 1MB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 4), 1}, // 2MB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 5), 1}, // 4MB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 6), 1}, // 8MB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 7), 1}, // 16MB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 8), 1}, // 32MB + {1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 9), 1}, // 64MB + } + }; struct alignas(8) ControlBlock { ControlBlock() noexcept { @@ -115,8 +135,7 @@ class SharedMemoryManager : vk::not_copyable { std::array workers_table{}; freelist_t free_messages{}; - // index => (1 << index) MB: - // 0 => 1MB, 1 => 2MB, 2 => 4MB, 3 => 8MB, 4 => 16MB, 5 => 32MB, 6 => 64MB + // 0 => 256KB, 1 => 512KB, 2 => 1MB, 3 => 2MB, 4 => 4MB, 5 => 8MB, 6 => 16MB, 7 => 32MB, 8 => 64MB std::array free_extra_memory{}; }; ControlBlock *control_block_{nullptr}; diff --git a/server/php-engine.cpp b/server/php-engine.cpp index a1a0c62992..c0aa9550d8 100644 --- a/server/php-engine.cpp +++ b/server/php-engine.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -2020,13 +2021,27 @@ int main_args_handler(int i, const char *long_option) { return 0; } case 2018: { - const int messages_count = atoi(optarg); - if (messages_count <= 0) { - kprintf("--%s option: couldn't parse argument\n", long_option); + constexpr size_t dist_size = 1 + job_workers::JOB_EXTRA_MEMORY_BUFFER_BUCKETS; + std::array weights; + try { + std::stringstream ss(optarg); + int num = 0; + while (ss.good()) { + std::string t; + std::getline(ss, t, ','); + auto s = vk::trim(t); + weights[num++] = std::stod(std::string{s.begin(), s.end()}); + if (num > dist_size) { + kprintf("--%s option: can't parse distribution - 10 digits expected\n", long_option); + return -1; + } + } + } catch(const std::exception &e) { + kprintf("--%s option: can't parse distribution - %s\n", long_option, e.what()); return -1; } - if (!vk::singleton::get().set_shared_messages_count(static_cast(messages_count))) { - kprintf("--%s option: too small\n", long_option); + if (!vk::singleton::get().set_shared_memory_distribution_weights(weights)) { + kprintf("--%s option: too small weight for the shared messages\n", long_option); return -1; } return 0; @@ -2150,18 +2165,6 @@ int main_args_handler(int i, const char *long_option) { } return 0; } - case 2031: { - const int messages_count_multiplier = atoi(optarg); - if (messages_count_multiplier <= 0) { - kprintf("--%s option: couldn't parse argument\n", long_option); - return -1; - } - if (!vk::singleton::get().set_shared_messages_count_process_multiplier(static_cast(messages_count_multiplier))) { - kprintf("--%s option: too small\n", long_option); - return -1; - } - return 0; - } case 2032: { std::ifstream file(optarg); if (!file) { @@ -2264,7 +2267,9 @@ void parse_main_args(int argc, char *argv[]) { parse_option("warmup-timeout", required_argument, 2015, "the maximum time for the instance cache warm up in seconds"); parse_option("job-workers-ratio", required_argument, 2016, "the jobs workers ratio of the overall workers number"); parse_option("job-workers-shared-memory-size", required_argument, 2017, "the total size of shared memory used for job workers related communication"); - parse_option("job-workers-shared-messages", required_argument, 2018, "the total count of the shared messages for job workers related communication"); + parse_option("job-workers-shared-memory-distribution-weights", required_argument, 2018, "Weights for distributing shared memory between fixed size buffers.\n" + "10 comma separated digits: '2, 2, 2, 2, 1, 1, 1, 1, 1, 1'\n" + "For each of 10 groups: 128kb, 256kb, ... , 64mb buffers."); parse_option("lease-stop-ready-timeout", required_argument, 2019, "timeout for RPC_STOP_READY acknowledgement waiting in seconds (default: 0)"); parse_option("mysql-user", required_argument, 2020, "MySQL user"); parse_option("mysql-password", required_argument, 2021, "MySQL password"); @@ -2286,8 +2291,6 @@ void parse_main_args(int argc, char *argv[]) { parse_option("sigterm-wait-time", required_argument, 2029, "Time to wait before termination on SIGTERM"); parse_option("job-workers-shared-memory-size-process-multiplier", required_argument, 2030, "Per process memory size used to calculate the total size of shared memory for job workers related communication:\n" "memory limit = per_process_memory * processes_count"); - parse_option("job-workers-shared-messages-process-multiplier", required_argument, 2031, "Coefficient used to calculate the total count of the shared messages for job workers related communication:\n" - "messages count = coefficient * processes_count"); parse_option("runtime-config", required_argument, 2032, "JSON file path that will be available at runtime as 'mixed' via 'kphp_runtime_config()"); parse_option("oom-handling-memory-ratio", required_argument, 2033, "memory ratio of overall script memory to handle OOM errors (default: 0.00)"); parse_option("hard-time-limit", required_argument, 2034, "time limit for script termination after the main timeout has expired (default: 1 sec). Use 0 to disable"); diff --git a/tests/cpp/runtime/memory_resource/extra-memory-pool-test.cpp b/tests/cpp/runtime/memory_resource/extra-memory-pool-test.cpp index 2a061ab9cc..06b57b5b99 100644 --- a/tests/cpp/runtime/memory_resource/extra-memory-pool-test.cpp +++ b/tests/cpp/runtime/memory_resource/extra-memory-pool-test.cpp @@ -12,6 +12,8 @@ namespace mr = memory_resource; constexpr size_t pool_sizeof = 16; TEST(extra_memory_pool_test, test_payload_size_member) { + ASSERT_EQ(mr::extra_memory_pool{256 * 1024}.get_pool_payload_size(), 256 * 1024 - pool_sizeof); + ASSERT_EQ(mr::extra_memory_pool{512 * 1024}.get_pool_payload_size(), 512 * 1024 - pool_sizeof); ASSERT_EQ(mr::extra_memory_pool{1 * 1024 * 1024}.get_pool_payload_size(), 1 * 1024 * 1024 - pool_sizeof); ASSERT_EQ(mr::extra_memory_pool{2 * 1024 * 1024}.get_pool_payload_size(), 2 * 1024 * 1024 - pool_sizeof); ASSERT_EQ(mr::extra_memory_pool{4 * 1024 * 1024}.get_pool_payload_size(), 4 * 1024 * 1024 - pool_sizeof); @@ -24,6 +26,8 @@ TEST(extra_memory_pool_test, test_payload_size_member) { } TEST(extra_memory_pool_test, test_payload_size_static_member) { + ASSERT_EQ(mr::extra_memory_pool::get_pool_payload_size(256 * 1024), 256 * 1024 - pool_sizeof); + ASSERT_EQ(mr::extra_memory_pool::get_pool_payload_size(512 * 1024), 512 * 1024 - pool_sizeof); ASSERT_EQ(mr::extra_memory_pool::get_pool_payload_size(1 * 1024 * 1024), 1 * 1024 * 1024 - pool_sizeof); ASSERT_EQ(mr::extra_memory_pool::get_pool_payload_size(2 * 1024 * 1024), 2 * 1024 * 1024 - pool_sizeof); ASSERT_EQ(mr::extra_memory_pool::get_pool_payload_size(4 * 1024 * 1024), 4 * 1024 * 1024 - pool_sizeof); @@ -77,112 +81,3 @@ TEST(extra_memory_pool_test, test_extra_memory_pool) { do_extra_memory_pool_test<2 * 1024 * 1024, 0x1d>(); do_extra_memory_pool_test<4 * 1024 * 1024, 0x2d>(); } - -TEST(extra_memory_pool_test, test_size_by_bucket_id) { - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(0), 1 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(1), 2 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(2), 4 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(3), 8 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(4), 16 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(5), 32 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(6), 64 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(7), 128 * 1024 * 1024); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_size_by_bucket(8), 256 * 1024 * 1024); -} - -TEST(extra_memory_pool_test, test_bucket_id_by_size) { - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{1 * 1024 * 1024}), 0); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{2 * 1024 * 1024}), 1); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{4 * 1024 * 1024}), 2); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{8 * 1024 * 1024}), 3); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{pool_sizeof * 1024 * 1024}), 4); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{32 * 1024 * 1024}), 5); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{64 * 1024 * 1024}), 6); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{128 * 1024 * 1024}), 7); - ASSERT_EQ(mr::extra_memory_raw_bucket::get_bucket(mr::extra_memory_pool{256 * 1024 * 1024}), 8); -} - -TEST(extra_memory_pool_test, test_memory_distribution_not_enougth_memory) { - constexpr size_t buffer_size = 512 * 1024; - auto buffer = std::make_unique(buffer_size); - std::array extra_memory; - ASSERT_EQ(mr::distribute_memory(extra_memory, 100, buffer.get(), buffer_size), buffer_size); - for (auto &m : extra_memory) { - ASSERT_EQ(m.buffers_count(), 0); - } -} - -TEST(extra_memory_pool_test, test_memory_distribution_left_some_memory) { - constexpr size_t buffer_size = 5 * 1024 * 1024 + 123456; - auto buffer = std::make_unique(buffer_size); - std::array extra_memory; - ASSERT_EQ(mr::distribute_memory(extra_memory, 2, buffer.get(), buffer_size), 123456); - - ASSERT_EQ(extra_memory[0].buffers_count(), 3); - ASSERT_EQ(extra_memory[0].get_extra_pool_raw(0), buffer.get()); - ASSERT_EQ(extra_memory[0].get_extra_pool_raw(1), buffer.get() + 1 * 1024 * 1024); - ASSERT_EQ(extra_memory[0].get_extra_pool_raw(2), buffer.get() + 2 * 1024 * 1024); - - ASSERT_EQ(extra_memory[1].buffers_count(), 1); - ASSERT_EQ(extra_memory[1].get_extra_pool_raw(0), buffer.get() + 3 * 1024 * 1024); - - for (size_t i = 2; i != extra_memory.size(); ++i) { - ASSERT_EQ(extra_memory[i].buffers_count(), 0); - } -} - -TEST(extra_memory_pool_test, test_memory_distribution_more_than_required) { - constexpr size_t buffer_size = 701 * 1024 * 1024 + 123456; - std::array extra_memory; - ASSERT_EQ(mr::distribute_memory(extra_memory, 2, nullptr, buffer_size), 123456); - // 0 + 3*1 = 3MB - ASSERT_EQ(extra_memory[0].buffers_count(), 3); - ASSERT_EQ(extra_memory[0].get_extra_pool_raw(0), reinterpret_cast(0)); - // 3 + 1*2 = 5MB - ASSERT_EQ(extra_memory[1].buffers_count(), 1); - ASSERT_EQ(extra_memory[1].get_extra_pool_raw(0), reinterpret_cast(3 * 1024 * 1024)); - // 5 + 2*4 = 13MB - ASSERT_EQ(extra_memory[2].buffers_count(), 2); - ASSERT_EQ(extra_memory[2].get_extra_pool_raw(0), reinterpret_cast(5 * 1024 * 1024)); - // 13 + 2*8 = 29MB - ASSERT_EQ(extra_memory[3].buffers_count(), 2); - ASSERT_EQ(extra_memory[3].get_extra_pool_raw(0), reinterpret_cast(13 * 1024 * 1024)); - // 29 + 2*16 = 61MB - ASSERT_EQ(extra_memory[4].buffers_count(), 2); - ASSERT_EQ(extra_memory[4].get_extra_pool_raw(0), reinterpret_cast(29 * 1024 * 1024)); - // 61 + 2*32 = 125MB - ASSERT_EQ(extra_memory[5].buffers_count(), 2); - ASSERT_EQ(extra_memory[5].get_extra_pool_raw(0), reinterpret_cast(61 * 1024 * 1024)); - // 125 + 9*64 = 701MB - ASSERT_EQ(extra_memory[6].buffers_count(), 9); - ASSERT_EQ(extra_memory[6].get_extra_pool_raw(0), reinterpret_cast(125 * 1024 * 1024)); -} - -TEST(extra_memory_pool_test, test_memory_distribution) { - constexpr size_t processes = 300; - // 300*7 = 2100MB - constexpr size_t buffer_size = processes * 7 * 1024 * 1024; - std::array extra_memory; - ASSERT_EQ(mr::distribute_memory(extra_memory, processes, nullptr, buffer_size), 0); - // 0 + 300*1 = 300MB // left 1800MB - ASSERT_EQ(extra_memory[0].buffers_count(), processes); - ASSERT_EQ(extra_memory[0].get_extra_pool_raw(0), reinterpret_cast(0)); - // 300 + 150*2 = 600MB // left 1500MB - ASSERT_EQ(extra_memory[1].buffers_count(), processes / 2); - ASSERT_EQ(extra_memory[1].get_extra_pool_raw(0), reinterpret_cast(processes * 1024 * 1024)); - // 600 + 75*4 = 900MB // left 1200MB - ASSERT_EQ(extra_memory[2].buffers_count(), processes / 4); - ASSERT_EQ(extra_memory[2].get_extra_pool_raw(0), reinterpret_cast(processes * 2 * 1024 * 1024)); - // 900 + 38*8 = 1204MB // left 896MB - ASSERT_EQ(extra_memory[3].buffers_count(), 1 + processes / 8); - ASSERT_EQ(extra_memory[3].get_extra_pool_raw(0), reinterpret_cast(processes * 3 * 1024 * 1024)); - // 1204 + 18*16 = 1492MB // left 608MB - ASSERT_EQ(extra_memory[4].buffers_count(), processes / 16); - ASSERT_EQ(extra_memory[4].get_extra_pool_raw(0), reinterpret_cast((processes * 4 + 4) * 1024 * 1024)); - // 1492 + 9*32 = 1780MB // left 320MB - ASSERT_EQ(extra_memory[5].buffers_count(), processes / 32); - ASSERT_EQ(extra_memory[5].get_extra_pool_raw(0), reinterpret_cast((processes * 5 - 8) * 1024 * 1024)); - // 1780 + 5*64 = 2100MB - ASSERT_EQ(extra_memory[6].buffers_count(), 1 + processes / 64); - ASSERT_EQ(extra_memory[6].get_extra_pool_raw(0), reinterpret_cast((processes * 6 - 20) * 1024 * 1024)); -} diff --git a/tests/cpp/server/job-workers/shared-memory-manager-test.cpp b/tests/cpp/server/job-workers/shared-memory-manager-test.cpp index a1fffc1abc..0696dc50a2 100644 --- a/tests/cpp/server/job-workers/shared-memory-manager-test.cpp +++ b/tests/cpp/server/job-workers/shared-memory-manager-test.cpp @@ -6,6 +6,7 @@ #include "common/macos-ports.h" +#include "runtime/memory_resource/extra-memory-pool.h" #include "server/job-workers/job-message.h" #include "server/job-workers/job-stats.h" #include "server/job-workers/job-workers-context.h" @@ -53,6 +54,8 @@ struct AcquiredReleased { void worker_function() { AcquiredReleased messages; + AcquiredReleased _256kb; + AcquiredReleased _512kb; AcquiredReleased _1mb; AcquiredReleased _2mb; AcquiredReleased _4mb; @@ -75,33 +78,45 @@ void worker_function() { auto *message_with_extra_memory = SHMM::get().acquire_shared_message(); ASSERT_ACQUIRED(SHMM::get().get_stats().messages, messages); check_new_message(message_with_extra_memory); - // 1mb x 8 + // 256kb + ASSERT_FALSE(message_with_extra_memory->resource.is_enough_memory_for(200 * 1024)); + ASSERT_TRUE(SHMM::get().request_extra_memory_for_resource(message_with_extra_memory->resource, 200 * 1024)); + ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[0], _256kb); + ASSERT_TRUE(message_with_extra_memory->resource.is_enough_memory_for(200 * 1024)); + // 512kb + ASSERT_FALSE(message_with_extra_memory->resource.is_enough_memory_for(400 * 1024)); + ASSERT_TRUE(SHMM::get().request_extra_memory_for_resource(message_with_extra_memory->resource, 400 * 1024)); + ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[1], _512kb); + ASSERT_TRUE(message_with_extra_memory->resource.is_enough_memory_for(400 * 1024)); + // 1mb ASSERT_FALSE(message_with_extra_memory->resource.is_enough_memory_for(800 * 1024)); ASSERT_TRUE(SHMM::get().request_extra_memory_for_resource(message_with_extra_memory->resource, 800 * 1024)); - ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[0], _1mb); + ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[2], _1mb); ASSERT_TRUE(message_with_extra_memory->resource.is_enough_memory_for(800 * 1024)); - // 2mb x 4 + // 2mb if (vk::any_of_equal(logname_id, 1, 2, 3, 4)) { ASSERT_FALSE(message_with_extra_memory->resource.is_enough_memory_for(1500 * 1024)); ASSERT_TRUE(SHMM::get().request_extra_memory_for_resource(message_with_extra_memory->resource, 1500 * 1024)); - ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[1], _2mb); + ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[3], _2mb); ASSERT_TRUE(message_with_extra_memory->resource.is_enough_memory_for(1500 * 1024)); } - // 4mb x 2 + // 4mb if (vk::any_of_equal(logname_id, 1, 5)) { ASSERT_FALSE(message_with_extra_memory->resource.is_enough_memory_for(3 * 1024 * 1024)); ASSERT_TRUE(SHMM::get().request_extra_memory_for_resource(message_with_extra_memory->resource, 3 * 1024 * 1024)); - ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[2], _4mb); + ASSERT_ACQUIRED(SHMM::get().get_stats().extra_memory[4], _4mb); ASSERT_TRUE(message_with_extra_memory->resource.is_enough_memory_for(3 * 1024 * 1024)); } SHMM::get().release_shared_message(message_with_extra_memory); ASSERT_RELEASED(SHMM::get().get_stats().messages, messages); - ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[0], _1mb); + ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[0], _256kb); + ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[1], _512kb); + ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[2], _1mb); if (vk::any_of_equal(logname_id, 1, 2, 3, 4)) { - ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[1], _2mb); + ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[3], _2mb); } if (vk::any_of_equal(logname_id, 1, 5)) { - ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[2], _4mb); + ASSERT_RELEASED(SHMM::get().get_stats().extra_memory[4], _4mb); } } } @@ -114,13 +129,13 @@ void worker_function() { TEST(shared_memory_manager_test, test_manager) { SHMM::get().set_memory_limit(256 * 1024 * 1024); + SHMM::get().set_shared_memory_distribution_weights({2, 2, 2, 2, 1, 1, 1, 1, 1, 1}); vk::singleton::get().set_total_workers_count(7); SHMM::get().init(); const auto &stats = SHMM::get().get_stats(); ASSERT_EQ(stats.memory_limit, 256 * 1024 * 1024); - ASSERT_GT(stats.unused_memory, 980000); // 1mb == 1 048 576 - ASSERT_LT(stats.unused_memory, 1048576); + ASSERT_LT(stats.unused_memory, 1 << JOB_SHARED_MESSAGE_SIZE_EXP); std::array children{}; for (int i = 0; i != children.size(); ++i) { @@ -141,19 +156,115 @@ TEST(shared_memory_manager_test, test_manager) { ASSERT_EQ(WEXITSTATUS(status), 0); } - ASSERT_BUFFER(stats.messages, 14, 150000); + ASSERT_BUFFER(stats.messages, 293, 150000); + // 256kb + ASSERT_BUFFER(stats.extra_memory[0], 147, 50000); + // 512kb + ASSERT_BUFFER(stats.extra_memory[1], 73, 50000); // 1mb - ASSERT_BUFFER(stats.extra_memory[0], 8, 50000); + ASSERT_BUFFER(stats.extra_memory[2], 36, 50000); // 2mb - ASSERT_BUFFER(stats.extra_memory[1], 4, 40000); + ASSERT_BUFFER(stats.extra_memory[3], 9, 40000); // 4mb - ASSERT_BUFFER(stats.extra_memory[2], 2, 20000); + ASSERT_BUFFER(stats.extra_memory[4], 5, 20000); // 8mb - ASSERT_BUFFER(stats.extra_memory[3], 2, 0); + ASSERT_BUFFER(stats.extra_memory[5], 3, 0); // 16mb - ASSERT_BUFFER(stats.extra_memory[4], 1, 0); + ASSERT_BUFFER(stats.extra_memory[6], 1, 0); // 32mb - ASSERT_BUFFER(stats.extra_memory[5], 2, 0); + ASSERT_BUFFER(stats.extra_memory[7], 1, 0); // 64mb - ASSERT_BUFFER(stats.extra_memory[6], 2, 0); + ASSERT_BUFFER(stats.extra_memory[8], 0, 0); +} + + +TEST(shared_memory_manager_test, test_size_by_bucket_id) { + ASSERT_EQ(get_extra_shared_memory_buffer_size(0), 256 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(1), 512 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(2), 1 * 1024 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(3), 2 * 1024 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(4), 4 * 1024 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(5), 8 * 1024 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(6), 16 * 1024 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(7), 32 * 1024 * 1024); + ASSERT_EQ(get_extra_shared_memory_buffer_size(8), 64 * 1024 * 1024); +} + +TEST(shared_memory_manager_test, test_bucket_id_by_size) { + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(256 * 1024), 0); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(512 * 1024), 1); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(1 * 1024 * 1024), 2); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(2 * 1024 * 1024), 3); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(4 * 1024 * 1024), 4); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(8 * 1024 * 1024), 5); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(16 * 1024 * 1024), 6); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(32 * 1024 * 1024), 7); + ASSERT_EQ(get_extra_shared_memory_buffers_group_idx(64 * 1024 * 1024), 8); +} + +void check_distribution(size_t buffer_size, const std::array &actual, const std::array &expected) { + size_t total_used_mem = 0; + for (int i = 0; i < 10; ++i) { + size_t cnt = actual[i]; + ASSERT_EQ(cnt, expected[i]); + total_used_mem += cnt * (1 << (JOB_SHARED_MESSAGE_SIZE_EXP + i)); + } + ASSERT_LE(total_used_mem, buffer_size); +} + +TEST(shared_memory_manager_test, test_memory_distribution_not_enough_memory) { + const size_t buffer_size = JOB_SHARED_MESSAGE_BYTES - 1; + SHMM::get().set_shared_memory_distribution_weights({2, 2, 2, 2, 1, 1, 1, 1, 1, 1}); + std::array buffers_distribution; + ASSERT_EQ(SHMM::get().calc_shared_memory_buffers_distribution(buffer_size, buffers_distribution), buffer_size); + check_distribution(buffer_size, buffers_distribution, {0, 0, 0, 0, 0, 0, 0, 0, 0, 0}); +} + +TEST(shared_memory_manager_test, test_memory_distribution_left_some_memory) { + constexpr size_t buffer_size = 5 * 1024 * 1024 + (JOB_SHARED_MESSAGE_BYTES - 1); + auto buffer = std::make_unique(buffer_size); + SHMM::get().set_shared_memory_distribution_weights({2, 2, 2, 2, 1, 1, 1, 1, 1, 1}); + std::array buffers_distribution; + ASSERT_EQ(SHMM::get().calc_shared_memory_buffers_distribution(buffer_size, buffers_distribution), (JOB_SHARED_MESSAGE_BYTES - 1)); + check_distribution(buffer_size, buffers_distribution, {6, 3, 1, 1, 1, 0, 0, 0, 0, 0}); +} + +TEST(shared_memory_manager_test, test_memory_distribution_more_than_required) { + constexpr size_t buffer_size = 701 * 1024 * 1024 + (JOB_SHARED_MESSAGE_BYTES - 1); + SHMM::get().set_shared_memory_distribution_weights({2, 2, 2, 2, 1, 1, 1, 1, 1, 1}); + std::array buffers_distribution; + ASSERT_EQ(SHMM::get().calc_shared_memory_buffers_distribution(buffer_size, buffers_distribution), (JOB_SHARED_MESSAGE_BYTES - 1)); + check_distribution(buffer_size, buffers_distribution, {802, 401, 201, 100, 26, 12, 7, 3, 1, 1}); +} + +TEST(shared_memory_manager_test, test_no_extra_buffers) { + constexpr size_t buffer_size = 100 * JOB_SHARED_MESSAGE_BYTES; + SHMM::get().set_shared_memory_distribution_weights({1, 0, 0, 0, 0, 0, 0, 0, 0, 0}); + std::array buffers_distribution; + ASSERT_EQ(SHMM::get().calc_shared_memory_buffers_distribution(buffer_size, buffers_distribution), 0); + check_distribution(buffer_size, buffers_distribution, {100, 0, 0, 0, 0, 0, 0, 0, 0, 0}); +} + +TEST(shared_memory_manager_test, test_equal_distribution) { + constexpr size_t buffer_size = 512l * 10 * 1024 * 1024; + SHMM::get().set_shared_memory_distribution_weights({1, 1, 1, 1, 1, 1, 1, 1, 1, 1}); + std::array buffers_distribution; + ASSERT_EQ(SHMM::get().calc_shared_memory_buffers_distribution(buffer_size, buffers_distribution), 0); + check_distribution(buffer_size, buffers_distribution, {4096, 2048, 1024, 512, 256, 128, 64, 32, 16, 8}); +} + +TEST(shared_memory_manager_test, test_float_distribution) { + constexpr size_t buffer_size = 300l * 7 * 1024 * 1024; // 300*7 = 2100MB + SHMM::get().set_shared_memory_distribution_weights({0.5, 0.5, 0.5, 0.5, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25}); + std::array buffers_distribution; + ASSERT_EQ(SHMM::get().calc_shared_memory_buffers_distribution(buffer_size, buffers_distribution), 0); + check_distribution(buffer_size, buffers_distribution, {2400, 1200, 600, 300, 76, 37, 19, 10, 5, 2}); +} + +TEST(shared_memory_manager_test, test_memory_distribution) { + constexpr size_t buffer_size = 300l * 7 * 1024 * 1024; // 300*7 = 2100MB + SHMM::get().set_shared_memory_distribution_weights({2, 2, 2, 2, 1, 1, 1, 1, 1, 1}); + std::array buffers_distribution; + ASSERT_EQ(SHMM::get().calc_shared_memory_buffers_distribution(buffer_size, buffers_distribution), 0); + check_distribution(buffer_size, buffers_distribution, {2400, 1200, 600, 300, 76, 37, 19, 10, 5, 2}); } diff --git a/tests/phpt/kphp_configuration/8_warm_up_configuration.php b/tests/phpt/kphp_configuration/8_oom_handling_memory_ratio.php similarity index 100% rename from tests/phpt/kphp_configuration/8_warm_up_configuration.php rename to tests/phpt/kphp_configuration/8_oom_handling_memory_ratio.php diff --git a/tests/phpt/kphp_configuration/9_job_workers_shared_memory_distribution_weights.php b/tests/phpt/kphp_configuration/9_job_workers_shared_memory_distribution_weights.php new file mode 100644 index 0000000000..d891449589 --- /dev/null +++ b/tests/phpt/kphp_configuration/9_job_workers_shared_memory_distribution_weights.php @@ -0,0 +1,10 @@ +@ok + "2, 1, 1, 1, 1, 1, 1, 1, 1, 1", + ]; +} + +echo "successfully compiled\n"; diff --git a/tests/python/tests/job_workers/test_complex_scenario_job.py b/tests/python/tests/job_workers/test_complex_scenario_job.py index 3f48778478..726d821efd 100644 --- a/tests/python/tests/job_workers/test_complex_scenario_job.py +++ b/tests/python/tests/job_workers/test_complex_scenario_job.py @@ -8,6 +8,7 @@ class TestComplexScenarioJob(KphpServerAutoTestCase): def extra_class_setup(cls): cls.kphp_server.update_options({ "--workers-num": 18, + "--job-workers-shared-memory-distribution-weights": '2,2,2,2,1,1,1,1,1,1', "--job-workers-ratio": 0.16, "--verbosity-job-workers=2": True, }) @@ -62,11 +63,12 @@ def test_complex_scenario_job(self): for _ in pool.imap_unordered(self.do_test, range(requests_count)): pass self.kphp_server.assert_stats( + timeout=10, prefix="kphp_server.workers_job_", expected_added_stats={ "memory_messages_shared_messages_buffers_acquired": requests_count * 10, "memory_messages_shared_messages_buffers_released": requests_count * 10, - "memory_messages_shared_messages_buffers_reserved": 2 * (15 + 3), + "memory_messages_shared_messages_buffers_reserved": 164, "jobs_queue_size": 0, "jobs_sent": requests_count * 5, "jobs_replied": requests_count * 5, diff --git a/tests/python/tests/job_workers/test_job_errors.py b/tests/python/tests/job_workers/test_job_errors.py index e2a59dc079..32dc7cf12d 100644 --- a/tests/python/tests/job_workers/test_job_errors.py +++ b/tests/python/tests/job_workers/test_job_errors.py @@ -16,6 +16,7 @@ def extra_class_setup(cls): cls.kphp_server.update_options({ "--workers-num": 4, "--job-workers-ratio": 0.5, + "--job-workers-shared-memory-distribution-weights": '2,2,2,2,1,1,1,1,1,1', "--verbosity-job-workers=2": True, }) @@ -109,10 +110,12 @@ def test_job_client_oom(self): prefix="kphp_server.workers_job_memory_messages_extra_buffers_", initial_stats=stats_extra_buffers_before, expected_added_stats={ - "1mb_buffer_acquire_fails": 6, "1mb_buffers_acquired": 4, "1mb_buffers_released": 4, - "2mb_buffer_acquire_fails": 4, "2mb_buffers_acquired": 2, "2mb_buffers_released": 2, - "4mb_buffer_acquire_fails": 3, "4mb_buffers_acquired": 1, "4mb_buffers_released": 1, - "8mb_buffer_acquire_fails": 1, "8mb_buffers_acquired": 2, "8mb_buffers_released": 2, + "256kb_buffer_acquire_fails": 18, "256kb_buffers_acquired": 18, "256kb_buffers_released": 18, + "512kb_buffer_acquire_fails": 8, "512kb_buffers_acquired": 10, "512kb_buffers_released": 10, + "1mb_buffer_acquire_fails": 4, "1mb_buffers_acquired": 4, "1mb_buffers_released": 4, + "2mb_buffer_acquire_fails": 3, "2mb_buffers_acquired": 1, "2mb_buffers_released": 1, + "4mb_buffer_acquire_fails": 2, "4mb_buffers_acquired": 1, "4mb_buffers_released": 1, + "8mb_buffer_acquire_fails": 1, "8mb_buffers_acquired": 1, "8mb_buffers_released": 1, "16mb_buffer_acquire_fails": 1, "16mb_buffers_acquired": 0, "16mb_buffers_released": 0, "32mb_buffer_acquire_fails": 1, "32mb_buffers_acquired": 0, "32mb_buffers_released": 0, "64mb_buffer_acquire_fails": 1, "64mb_buffers_acquired": 0, "64mb_buffers_released": 0, @@ -125,20 +128,19 @@ def test_big_response(self): "Warning: Can't store job response X2Response: too big response" ]) self.kphp_server.assert_stats( + timeout=10, prefix="kphp_server.workers_job_memory_messages_extra_buffers_", initial_stats=stats_extra_buffers_before, expected_added_stats={ - "1mb_buffer_acquire_fails": self.cmpGe(6), "1mb_buffers_acquired": self.cmpGe(4), - "1mb_buffers_released": self.cmpGe(4), "2mb_buffers_released": self.cmpGe(2), - "2mb_buffer_acquire_fails": self.cmpGe(4), "2mb_buffers_acquired": self.cmpGe(2), - "4mb_buffer_acquire_fails": self.cmpGe(3), "4mb_buffers_acquired": self.cmpGe(1), - "4mb_buffers_released": self.cmpGe(1), "8mb_buffers_released": self.cmpGe(2), - "8mb_buffer_acquire_fails": self.cmpGe(1), "8mb_buffers_acquired": self.cmpGe(2), - "16mb_buffer_acquire_fails": self.cmpGe(1), "16mb_buffers_acquired": self.cmpGe(0), - "16mb_buffers_released": self.cmpGe(0), "32mb_buffers_released": self.cmpGe(0), - "32mb_buffer_acquire_fails": self.cmpGe(1), "32mb_buffers_acquired": self.cmpGe(0), - "64mb_buffer_acquire_fails": self.cmpGe(1), "64mb_buffers_acquired": self.cmpGe(0), - "64mb_buffers_released": self.cmpGe(0), + "256kb_buffer_acquire_fails": self.cmpGe(18), "256kb_buffers_acquired": self.cmpGe(18), "256kb_buffers_released": self.cmpGe(18), + "512kb_buffer_acquire_fails": self.cmpGe(8), "512kb_buffers_acquired": self.cmpGe(10), "512kb_buffers_released": self.cmpGe(10), + "1mb_buffer_acquire_fails": self.cmpGe(4), "1mb_buffers_acquired": self.cmpGe(4), "1mb_buffers_released": self.cmpGe(4), + "2mb_buffer_acquire_fails": self.cmpGe(3), "2mb_buffers_acquired": self.cmpGe(1), "2mb_buffers_released": self.cmpGe(1), + "4mb_buffer_acquire_fails": self.cmpGe(2), "4mb_buffers_acquired": self.cmpGe(1), "4mb_buffers_released": self.cmpGe(1), + "8mb_buffer_acquire_fails": self.cmpGe(1), "8mb_buffers_acquired": self.cmpGe(1), "8mb_buffers_released": self.cmpGe(1), + "16mb_buffer_acquire_fails": self.cmpGe(1), "16mb_buffers_acquired": self.cmpGe(0), "16mb_buffers_released": self.cmpGe(0), + "32mb_buffer_acquire_fails": self.cmpGe(1), "32mb_buffers_acquired": self.cmpGe(0), "32mb_buffers_released": self.cmpGe(0), + "64mb_buffer_acquire_fails": self.cmpGe(1), "64mb_buffers_acquired": self.cmpGe(0), "64mb_buffers_released": self.cmpGe(0), }) def test_client_wait_false(self): diff --git a/tests/python/tests/job_workers/test_job_shared_immutable_message.py b/tests/python/tests/job_workers/test_job_shared_immutable_message.py index b0a5870bc3..98b5deaeb3 100644 --- a/tests/python/tests/job_workers/test_job_shared_immutable_message.py +++ b/tests/python/tests/job_workers/test_job_shared_immutable_message.py @@ -11,7 +11,7 @@ def extra_class_setup(cls): "--job-workers-ratio": 0.8, "--verbosity-job-workers=2": True, "--job-workers-shared-memory-size": "80M", - + "--job-workers-shared-memory-distribution-weights": '1,1,1,1,1,1,1,4,1,1', }) def test_error_different_shared_memory_pieces(self): diff --git a/tests/python/tests/job_workers/test_job_shared_messages.py b/tests/python/tests/job_workers/test_job_shared_messages.py index 7409895ed4..cee94ce8cc 100644 --- a/tests/python/tests/job_workers/test_job_shared_messages.py +++ b/tests/python/tests/job_workers/test_job_shared_messages.py @@ -2,13 +2,18 @@ class TestJobSharedMessages(KphpServerAutoTestCase): + DEFAULT_DISTRIBUTION_STR = " \n 2 ,2, 2,\t 2, 1,\n 1, \n1,1 ,1 , 1 \n" + def _do_test(self, kphp_options, reserved): self.kphp_server.update_options(kphp_options) self.kphp_server.restart() self.kphp_server.assert_stats( + timeout=10, prefix="kphp_server.workers_job_memory_messages_", expected_added_stats={ "shared_messages_buffers_reserved": reserved["messages"], + "extra_buffers_256kb_buffers_reserved": reserved["256kb"], + "extra_buffers_512kb_buffers_reserved": reserved["512kb"], "extra_buffers_1mb_buffers_reserved": reserved["1mb"], "extra_buffers_2mb_buffers_reserved": reserved["2mb"], "extra_buffers_4mb_buffers_reserved": reserved["4mb"], @@ -18,101 +23,95 @@ def _do_test(self, kphp_options, reserved): "extra_buffers_64mb_buffers_reserved": reserved["64mb"], }) - def test_default(self): + def test_default_shared_memory_size_option(self): self._do_test( kphp_options={ "--workers-num": 4, "--job-workers-ratio": 0.5, - "--job-workers-shared-messages": None, "--job-workers-shared-memory-size": None + "--job-workers-shared-memory-distribution-weights": self.DEFAULT_DISTRIBUTION_STR, "--job-workers-shared-memory-size": None }, - reserved={"messages": 8, "1mb": 4, "2mb": 2, "4mb": 1, "8mb": 2, "16mb": 0, "32mb": 0, "64mb": 0} + reserved={"messages": 36, "256kb": 18, "512kb": 10, "1mb": 4, "2mb": 1, "4mb": 1, "8mb": 1, "16mb": 0, "32mb": 0, "64mb": 0} ) self._do_test( kphp_options={ "--workers-num": 10, "--job-workers-ratio": 0.2, - "--job-workers-shared-messages": None, "--job-workers-shared-memory-size": None + "--job-workers-shared-memory-distribution-weights": self.DEFAULT_DISTRIBUTION_STR, "--job-workers-shared-memory-size": None }, - reserved={"messages": 20, "1mb": 10, "2mb": 6, "4mb": 2, "8mb": 1, "16mb": 2, "32mb": 0, "64mb": 0} + reserved={"messages": 92, "256kb": 46, "512kb": 22, "1mb": 12, "2mb": 3, "4mb": 1, "8mb": 1, "16mb": 1, "32mb": 0, "64mb": 0} ) - def test_job_workers_shared_memory_size_option(self): + def test_custom_shared_memory_size_option(self): self._do_test( kphp_options={ "--workers-num": 4, "--job-workers-ratio": 0.5, - "--job-workers-shared-messages": None, "--job-workers-shared-memory-size": "120M" + "--job-workers-shared-memory-distribution-weights": self.DEFAULT_DISTRIBUTION_STR, "--job-workers-shared-memory-size": "120M" }, - reserved={"messages": 8, "1mb": 5, "2mb": 3, "4mb": 2, "8mb": 2, "16mb": 1, "32mb": 2, "64mb": 0} + reserved={"messages": 137, "256kb": 69, "512kb": 35, "1mb": 18, "2mb": 5, "4mb": 2, "8mb": 2, "16mb": 1, "32mb": 0, "64mb": 0} ) self._do_test( kphp_options={ "--workers-num": 10, "--job-workers-ratio": 0.2, - "--job-workers-shared-messages": None, "--job-workers-shared-memory-size": "500M" + "--job-workers-shared-memory-distribution-weights": self.DEFAULT_DISTRIBUTION_STR, "--job-workers-shared-memory-size": "500M" }, - reserved={"messages": 20, "1mb": 11, "2mb": 5, "4mb": 3, "8mb": 1, "16mb": 2, "32mb": 1, "64mb": 6} + reserved={"messages": 571, "256kb": 286, "512kb": 142, "1mb": 72, "2mb": 17, "4mb": 9, "8mb": 4, "16mb": 3, "32mb": 2, "64mb": 0} ) self._do_test( kphp_options={ "--workers-num": 10, "--job-workers-ratio": 0.2, - "--job-workers-shared-messages": None, "--job-workers-shared-memory-size": "7M" + "--job-workers-shared-memory-distribution-weights": self.DEFAULT_DISTRIBUTION_STR, "--job-workers-shared-memory-size": "7M" }, - reserved={"messages": 13, "1mb": 0, "2mb": 0, "4mb": 0, "8mb": 0, "16mb": 0, "32mb": 0, "64mb": 0} + reserved={"messages": 7, "256kb": 4, "512kb": 2, "1mb": 0, "2mb": 0, "4mb": 1, "8mb": 0, "16mb": 0, "32mb": 0, "64mb": 0} ) - def test_job_workers_shared_messages_option(self): + def test_default_shared_memory_distribution_option(self): self._do_test( kphp_options={ "--workers-num": 4, "--job-workers-ratio": 0.5, - "--job-workers-shared-messages": 50, "--job-workers-shared-memory-size": None - }, - reserved={"messages": 50, "1mb": 7, "2mb": 0, "4mb": 0, "8mb": 0, "16mb": 0, "32mb": 0, "64mb": 0} - ) - self._do_test( - kphp_options={ - "--workers-num": 10, "--job-workers-ratio": 0.2, - "--job-workers-shared-messages": 100, "--job-workers-shared-memory-size": None + "--job-workers-shared-memory-distribution-weights": None, "--job-workers-shared-memory-size": None }, - reserved={"messages": 100, "1mb": 30, "2mb": 0, "4mb": 0, "8mb": 0, "16mb": 0, "32mb": 0, "64mb": 0} + reserved={"messages": 36, "256kb": 18, "512kb": 10, "1mb": 4, "2mb": 1, "4mb": 1, "8mb": 1, "16mb": 0, + "32mb": 0, "64mb": 0} ) self._do_test( kphp_options={ - "--workers-num": 4, "--job-workers-ratio": 0.5, - "--job-workers-shared-messages": 100, "--job-workers-shared-memory-size": None + "--workers-num": 10, "--job-workers-ratio": 0.5, + "--job-workers-shared-memory-distribution-weights": None, "--job-workers-shared-memory-size": None }, - reserved={"messages": 64, "1mb": 0, "2mb": 0, "4mb": 0, "8mb": 0, "16mb": 0, "32mb": 0, "64mb": 0} - ) - self._do_test( - kphp_options={ - "--workers-num": 10, "--job-workers-ratio": 0.2, - "--job-workers-shared-messages": 10, "--job-workers-shared-memory-size": None - }, - reserved={"messages": 10, "1mb": 5, "2mb": 3, "4mb": 2, "8mb": 1, "16mb": 1, "32mb": 1, "64mb": 0} + reserved={"messages": 92, "256kb": 46, "512kb": 22, "1mb": 12, "2mb": 3, "4mb": 1, "8mb": 1, "16mb": 1, "32mb": 0, "64mb": 0} ) - def test_job_workers_shared_messages_and_memory_size_option(self): + def test_custom_shared_memory_distribution_option(self): self._do_test( kphp_options={ - "--workers-num": 4, "--job-workers-ratio": 0.5, - "--job-workers-shared-messages": 50, "--job-workers-shared-memory-size": "120M" + "--workers-num": 2, "--job-workers-ratio": 0.5, + "--job-workers-shared-memory-distribution-weights": "0.5,0.5,0.5,0.5,0.25,0.25, 0.25, 0.25, 0.25, 0.25", + "--job-workers-shared-memory-size": "120M" }, - reserved={"messages": 50, "1mb": 26, "2mb": 12, "4mb": 7, "8mb": 2, "16mb": 0, "32mb": 0, "64mb": 0} + reserved={"messages": 137, "256kb": 69, "512kb": 35, "1mb": 18, "2mb": 5, "4mb": 2, "8mb": 2, "16mb": 1, "32mb": 0, "64mb": 0} ) + self._do_test( kphp_options={ - "--workers-num": 10, "--job-workers-ratio": 0.2, - "--job-workers-shared-messages": 100, "--job-workers-shared-memory-size": "500M" + "--workers-num": 2, "--job-workers-ratio": 0.5, + "--job-workers-shared-memory-distribution-weights": "1, 0, 0, 0, 0, 0, 0, 0, 0, 1", + "--job-workers-shared-memory-size": "128M" }, - reserved={"messages": 100, "1mb": 51, "2mb": 25, "4mb": 13, "8mb": 7, "16mb": 3, "32mb": 2, "64mb": 2} + reserved={"messages": 511, "256kb": 0, "512kb": 0, "1mb": 0, "2mb": 0, "4mb": 0, "8mb": 0, "16mb": 0, "32mb": 0, "64mb": 1} ) + self._do_test( kphp_options={ - "--workers-num": 4, "--job-workers-ratio": 0.5, - "--job-workers-shared-messages": 150, "--job-workers-shared-memory-size": "50M" + "--workers-num": 2, "--job-workers-ratio": 0.5, + "--job-workers-shared-memory-distribution-weights": "10, 9, 8, 7, 6, 5, 4, 3, 2, 1", + "--job-workers-shared-memory-size": "128M" }, - reserved={"messages": 99, "1mb": 0, "2mb": 0, "4mb": 0, "8mb": 0, "16mb": 0, "32mb": 0, "64mb": 0} + reserved={"messages": 186 + 1, "256kb": 83 + 1, "512kb": 37, "1mb": 16 + 1, "2mb": 6, "4mb": 2 + 1, "8mb": 1, "16mb": 0 + 1, "32mb": 0, "64mb": 0} ) + self._do_test( kphp_options={ - "--workers-num": 10, "--job-workers-ratio": 0.2, - "--job-workers-shared-messages": 10, "--job-workers-shared-memory-size": "500M" + "--workers-num": 2, "--job-workers-ratio": 0.5, + "--job-workers-shared-memory-distribution-weights": "1, 1, 1, 1, 1, 1, 1, 1, 1, 1", + "--job-workers-shared-memory-size": "641M" }, - reserved={"messages": 10, "1mb": 6, "2mb": 2, "4mb": 1, "8mb": 2, "16mb": 1, "32mb": 2, "64mb": 6} + reserved={"messages": 512 + 1, "256kb": 256 + 1, "512kb": 128 + 1, "1mb": 64, "2mb": 32, "4mb": 16, "8mb": 8, "16mb": 4, "32mb": 2, "64mb": 1} )