Skip to content

Commit

Permalink
Job workers: improved shared memory distribution (#881)
Browse files Browse the repository at this point in the history
Implemented new weights based shared memory distribution approach
Reduced shared messages size from 512kb to 128kb
Added two new extra shared memory buffer groups of 256kb and 512kb size
  • Loading branch information
DrDet authored Aug 16, 2023
1 parent a37c782 commit a9aafd0
Show file tree
Hide file tree
Showing 17 changed files with 333 additions and 293 deletions.
8 changes: 1 addition & 7 deletions compiler/pipes/register-kphp-configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,11 @@ void RegisterKphpConfiguration::handle_constant_runtime_options(const ClassMembe
register_confdata_blacklist(opt_pair->value());
} else if (*opt_key == confdata_predefined_wildcard_key_) {
register_confdata_predefined_wildcard(opt_pair->value());
} else if (*opt_key == mysql_db_name_key_) {
register_mysql_db_name(opt_pair->value());
} else if (*opt_key == net_dc_mask_key_) {
register_net_dc_mask(opt_pair->value());
} else if (vk::any_of_equal(*opt_key,
warmup_workers_part_key_, warmup_instance_cache_elements_part_key_, warmup_timeout_sec_key_,
oom_handling_memory_ratio_key_)) {
oom_handling_memory_ratio_key_, mysql_db_name_key_, job_workers_shared_memory_distribution_weights_)) {
generic_register_simple_option(opt_pair->value(), *opt_key);
} else {
kphp_error(0, fmt_format("Got unexpected option {}::{}['{}']",
Expand Down Expand Up @@ -186,10 +184,6 @@ void RegisterKphpConfiguration::register_confdata_predefined_wildcard(VertexPtr
}
}

void RegisterKphpConfiguration::register_mysql_db_name(VertexPtr value) const noexcept {
generic_register_simple_option(value, mysql_db_name_key_);
}

void RegisterKphpConfiguration::register_net_dc_mask(VertexPtr value) const noexcept {
auto dc_masks = VertexUtil::get_actual_value(value).try_as<op_array>();
kphp_error_return(dc_masks, fmt_format("{}[{}] must be a constexpr array",
Expand Down
3 changes: 2 additions & 1 deletion compiler/pipes/register-kphp-configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class RegisterKphpConfiguration final : public SyncPipeF<FunctionPtr> {

void register_confdata_blacklist(VertexPtr value) const noexcept;
void register_confdata_predefined_wildcard(VertexPtr value) const noexcept;
void register_mysql_db_name(VertexPtr value) const noexcept;
void register_net_dc_mask(VertexPtr value) const noexcept;

void handle_constant_function_palette(const ClassMemberConstant &c);
Expand All @@ -43,6 +42,8 @@ class RegisterKphpConfiguration final : public SyncPipeF<FunctionPtr> {

const vk::string_view oom_handling_memory_ratio_key_{"--oom-handling-memory-ratio"};

const vk::string_view job_workers_shared_memory_distribution_weights_{"--job-workers-shared-memory-distribution-weights"};

public:
void execute(FunctionPtr function, DataStream<FunctionPtr> &unused_os) final;
void on_finish(DataStream<FunctionPtr> &os) override;
Expand Down
30 changes: 0 additions & 30 deletions runtime/memory_resource/extra-memory-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,40 +63,10 @@ class extra_memory_raw_bucket : vk::not_copyable {
return raw_memory_ + index * buffer_size_;
}

static int get_bucket(const extra_memory_pool &pool) noexcept {
return __builtin_ctzll(pool.get_buffer_size()) - 20;
}

static size_t get_size_by_bucket(size_t id) noexcept {
return 1 << (20 + id);
}

private:
uint8_t *raw_memory_{nullptr};
size_t buffers_count_{0};
size_t buffer_size_{0};
};

template<size_t N>
size_t distribute_memory(std::array<extra_memory_raw_bucket, N> &extra_memory, size_t initial_count, uint8_t *raw, size_t size) noexcept {
size_t left_memory = size;
for (size_t i = 0; i != extra_memory.size(); ++i) {
// (2^i) MB
const size_t buffer_size = extra_memory_raw_bucket::get_size_by_bucket(i);
size_t buffers_count = left_memory / buffer_size;
if (i + 1 != extra_memory.size()) {
// (initial_count / 2^i)
buffers_count = std::min(std::max(initial_count >> i, size_t{1}), buffers_count);
const size_t left_memory_for_next = left_memory - buffers_count * buffer_size;
const size_t next_buffer_size = buffer_size << 1;
buffers_count += left_memory_for_next % next_buffer_size >= buffer_size;
}

left_memory -= buffers_count * buffer_size;
extra_memory[i].init(raw, buffers_count, buffer_size);
raw += buffers_count * buffer_size;
}
return left_memory;
}

} // namespace memory_resource
19 changes: 12 additions & 7 deletions server/job-workers/job-message.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@

namespace job_workers {

// this constant is used for calculating total available messages count:
// messages count = the processes number * JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER
constexpr size_t JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER = 2;
constexpr size_t JOB_SHARED_MESSAGE_SIZE_EXP = 17;
// the size of the job shared message (without extra memory)
constexpr size_t JOB_SHARED_MESSAGE_BYTES = 512 * 1024; // 512KB
constexpr size_t JOB_SHARED_MESSAGE_BYTES = 1 << JOB_SHARED_MESSAGE_SIZE_EXP; // 128KB
// the number of buckets for extra shared memory,
// it is started from (2 * JOB_SHARED_MESSAGE_BYTES) Bytes and double for the next:
// 0 => 1MB, 1 => 2MB, 2 => 4MB, 3 => 8MB, 4 => 16MB, 5 => 32MB, 6 => 64MB
constexpr size_t JOB_EXTRA_MEMORY_BUFFER_BUCKETS = 7;
// 0 => 256KB, 1 => 512KB, 2 => 1MB, 3 => 2MB, 4 => 4MB, 5 => 8MB, 6 => 16MB, 7 => 32MB, 8 => 64MB
constexpr size_t JOB_EXTRA_MEMORY_BUFFER_BUCKETS = 9;
// the default multiplier for getting shared memory limit for job workers messaging:
// the default value for shared memory = the processes number * JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER
constexpr size_t JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER = 8 * 1024 * 1024; // 8MB for 1 process

inline int get_extra_shared_memory_buffers_group_idx(size_t size) noexcept {
return __builtin_ctzll(size) - (JOB_SHARED_MESSAGE_SIZE_EXP + 1);
}

inline size_t get_extra_shared_memory_buffer_size(size_t group_idx) noexcept {
return 1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 1 + group_idx);
}

struct JobSharedMemoryPiece;

struct alignas(8) JobMetadata : vk::not_copyable {
Expand Down
4 changes: 3 additions & 1 deletion server/job-workers/job-stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ void JobStats::write_stats_to(stats_t *stats) const noexcept {

size_t currently_used = messages.write_stats_to(stats, "workers.job.memory.messages.shared_messages.", JOB_SHARED_MESSAGE_BYTES);
constexpr std::array<const char *, JOB_EXTRA_MEMORY_BUFFER_BUCKETS> extra_memory_prefixes{
"workers.job.memory.messages.extra_buffers.256kb.",
"workers.job.memory.messages.extra_buffers.512kb.",
"workers.job.memory.messages.extra_buffers.1mb.",
"workers.job.memory.messages.extra_buffers.2mb.",
"workers.job.memory.messages.extra_buffers.4mb.",
Expand All @@ -52,7 +54,7 @@ void JobStats::write_stats_to(stats_t *stats) const noexcept {
"workers.job.memory.messages.extra_buffers.64mb.",
};
for (size_t i = 0; i != JOB_EXTRA_MEMORY_BUFFER_BUCKETS; ++i) {
const size_t buffer_size = memory_resource::extra_memory_raw_bucket::get_size_by_bucket(i);
const size_t buffer_size = get_extra_shared_memory_buffer_size(i);
currently_used += extra_memory[i].write_stats_to(stats, extra_memory_prefixes[i], buffer_size);
}

Expand Down
4 changes: 2 additions & 2 deletions server/job-workers/job-worker-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ bool JobWorkerServer::reply_is_expected() const noexcept {
}

void JobWorkerServer::flush_job_stat() noexcept {
vk::singleton<ServerStats>::get().add_job_stats(job_stat.job_wait_time, job_stat.job_request_max_real_memory_used, job_stat.job_request_max_memory_used,
job_stat.job_response_max_real_memory_used, job_stat.job_response_max_memory_used);
vk::singleton<ServerStats>::get().add_job_stats(job_stat.job_wait_time, job_stat.job_request_max_memory_used, job_stat.job_request_max_real_memory_used,
job_stat.job_response_max_memory_used, job_stat.job_response_max_real_memory_used);
job_stat = {};
}
} // namespace job_workers
85 changes: 56 additions & 29 deletions server/job-workers/shared-memory-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,52 @@ void SharedMemoryManager::init() noexcept {

const size_t processes = vk::singleton<WorkersControl>::get().get_total_workers_count();
assert(processes);
if (!shared_messages_count_) {
auto mul = shared_messages_count_process_multiplier_ ? shared_messages_count_process_multiplier_ : JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER;
shared_messages_count_ = processes * mul;
}
if (!memory_limit_) {
auto mul = per_process_memory_limit_ ? per_process_memory_limit_ : JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER;
memory_limit_ = processes * mul + sizeof(ControlBlock);
}
auto *raw_mem = static_cast<uint8_t *>(mmap_shared(memory_limit_));
const size_t left_memory = memory_limit_ - sizeof(ControlBlock);
const uint32_t messages_count = std::min(shared_messages_count_, left_memory / sizeof(JobSharedMessage));
const auto *raw_mem_start = raw_mem;
const auto *raw_mem_finish = raw_mem + memory_limit_;

control_block_ = new(raw_mem) ControlBlock{};
raw_mem += sizeof(ControlBlock);
const size_t left_memory = memory_limit_ - sizeof(ControlBlock);

std::array<size_t, 1 + JOB_EXTRA_MEMORY_BUFFER_BUCKETS> shared_memory_group_buffers_counts;
control_block_->stats.unused_memory = calc_shared_memory_buffers_distribution(left_memory, shared_memory_group_buffers_counts);

const uint32_t messages_count = shared_memory_group_buffers_counts[0];
assert(messages_count > 0);
for (uint32_t i = 0; i != messages_count; ++i) {
freelist_put(&control_block_->free_messages, raw_mem);
raw_mem += sizeof(JobSharedMessage);
}

std::array<memory_resource::extra_memory_raw_bucket, JOB_EXTRA_MEMORY_BUFFER_BUCKETS> extra_memory;
control_block_->stats.unused_memory = memory_resource::distribute_memory(extra_memory, shared_messages_count_ / 2, raw_mem,
left_memory - sizeof(JobSharedMessage) * messages_count);
for (size_t i = 0; i != extra_memory.size(); ++i) {
auto &free_extra_buffers = control_block_->free_extra_memory[i];
auto &extra_buffers = extra_memory[i];
for (size_t mem_index = 0; mem_index != extra_buffers.buffers_count(); ++mem_index) {
freelist_put(&free_extra_buffers, extra_buffers.get_extra_pool_raw(mem_index));
for (size_t i = 1; i < shared_memory_buffers_groups_.size(); ++i) {
const auto &cur_g = shared_memory_buffers_groups_[i];
const size_t cur_group_buffers_cnt = shared_memory_group_buffers_counts[i];
for (int j = 0; j < cur_group_buffers_cnt; ++j) {
freelist_put(&control_block_->free_extra_memory[i - 1], raw_mem);
raw_mem += cur_g.buffer_size;
}
control_block_->stats.extra_memory[i].count = extra_buffers.buffers_count();
control_block_->stats.extra_memory[i - 1].count = cur_group_buffers_cnt;
}

assert(raw_mem_start <= raw_mem && raw_mem <= raw_mem_finish);

control_block_->stats.memory_limit = memory_limit_;
control_block_->stats.messages.count = messages_count;
}

bool SharedMemoryManager::set_memory_limit(size_t memory_limit) noexcept {
if (memory_limit < JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER * sizeof(JobSharedMessage) + sizeof(ControlBlock)) {
if (memory_limit < sizeof(JobSharedMessage) + sizeof(ControlBlock)) {
return false;
}
memory_limit_ = memory_limit;
return true;
}

bool SharedMemoryManager::set_shared_messages_count(size_t shared_messages_count) noexcept {
if (shared_messages_count < JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER) {
return false;
}
shared_messages_count_ = shared_messages_count;
return true;
}

bool SharedMemoryManager::set_per_process_memory_limit(size_t per_process_memory_limit) noexcept {
if (per_process_memory_limit < JOB_DEFAULT_MEMORY_LIMIT_PROCESS_MULTIPLIER) {
return false;
Expand All @@ -77,11 +73,13 @@ bool SharedMemoryManager::set_per_process_memory_limit(size_t per_process_memory
return true;
}

bool SharedMemoryManager::set_shared_messages_count_process_multiplier(size_t shared_messages_count_process_multiplier) noexcept {
if (shared_messages_count_process_multiplier < JOB_DEFAULT_SHARED_MESSAGES_COUNT_PROCESS_MULTIPLIER) {
bool SharedMemoryManager::set_shared_memory_distribution_weights(const std::array<double, 1 + JOB_EXTRA_MEMORY_BUFFER_BUCKETS> &weights) noexcept {
if (weights[0] < 1e-6) {
return false;
}
shared_messages_count_process_multiplier_ = shared_messages_count_process_multiplier;
for (size_t i = 0; i < weights.size(); ++i) {
shared_memory_buffers_groups_[i].weight = weights[i];
}
return true;
}

Expand All @@ -97,7 +95,7 @@ void SharedMemoryManager::release_shared_message(JobMetadata *message) noexcept
while (extra_memory->get_pool_payload_size()) {
auto *releasing_extra_memory = extra_memory;
extra_memory = extra_memory->next_in_chain;
const int i = memory_resource::extra_memory_raw_bucket::get_bucket(*releasing_extra_memory);
const int i = get_extra_shared_memory_buffers_group_idx(releasing_extra_memory->get_buffer_size());
assert(i >= 0 && i < control_block_->free_extra_memory.size());
freelist_put(&control_block_->free_extra_memory[i], releasing_extra_memory);
++control_block_->stats.extra_memory[i].released;
Expand Down Expand Up @@ -134,7 +132,7 @@ void SharedMemoryManager::forcibly_release_all_attached_messages() noexcept {
bool SharedMemoryManager::request_extra_memory_for_resource(memory_resource::unsynchronized_pool_resource &resource, size_t required_size) noexcept {
assert(control_block_);
for (size_t i = 0; i != control_block_->free_extra_memory.size(); ++i) {
const size_t buffer_real_size = memory_resource::extra_memory_raw_bucket::get_size_by_bucket(i);
const size_t buffer_real_size = get_extra_shared_memory_buffer_size(i);
const size_t payload_size = memory_resource::extra_memory_pool::get_pool_payload_size(buffer_real_size);
if (payload_size < required_size) {
continue;
Expand All @@ -157,4 +155,33 @@ JobStats &SharedMemoryManager::get_stats() noexcept {
return control_block_->stats;
}

size_t SharedMemoryManager::calc_shared_memory_buffers_distribution(size_t mem_size, std::array<size_t, 1 + JOB_EXTRA_MEMORY_BUFFER_BUCKETS> &group_buffers_counts) const noexcept {
auto calc_group_buffers_count = [&](const shared_memory_buffers_group_info &cur_g) -> size_t {
double sum = 0;
for (auto &g : shared_memory_buffers_groups_) {
sum += g.weight;
}
const double ratio = cur_g.weight / sum;
return static_cast<size_t>(std::floor(mem_size * ratio / cur_g.buffer_size));
};

size_t total_used_mem = 0;
for (size_t i = 0; i < shared_memory_buffers_groups_.size(); ++i) {
auto &g = shared_memory_buffers_groups_[i];
const size_t buffers_cnt = calc_group_buffers_count(g);
group_buffers_counts[i] = buffers_cnt;
total_used_mem += g.buffer_size * buffers_cnt;
}
assert(total_used_mem <= mem_size);
// distribute left memory starting with the largest buffers
size_t unused_mem = mem_size - total_used_mem;
for (size_t i = shared_memory_buffers_groups_.size(); i-- > 0; ) {
auto &g = shared_memory_buffers_groups_[i];
size_t cnt = unused_mem / g.buffer_size;
group_buffers_counts[i] += cnt;
unused_mem = unused_mem % g.buffer_size;
}
return unused_mem;
}

} // namespace job_workers
31 changes: 25 additions & 6 deletions server/job-workers/shared-memory-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ class SharedMemoryManager : vk::not_copyable {
void forcibly_release_all_attached_messages() noexcept;

bool set_memory_limit(size_t memory_limit) noexcept;
bool set_shared_messages_count(size_t shared_messages_count) noexcept;
bool set_per_process_memory_limit(size_t per_process_memory_limit) noexcept;
bool set_shared_messages_count_process_multiplier(size_t shared_messages_count_process_multiplier) noexcept;
bool set_shared_memory_distribution_weights(const std::array<double, 1 + JOB_EXTRA_MEMORY_BUFFER_BUCKETS> &weights) noexcept;

bool request_extra_memory_for_resource(memory_resource::unsynchronized_pool_resource &resource, size_t required_size) noexcept;

Expand All @@ -93,15 +92,36 @@ class SharedMemoryManager : vk::not_copyable {
return control_block_;
}

size_t calc_shared_memory_buffers_distribution(size_t mem_size, std::array<size_t, 1 + JOB_EXTRA_MEMORY_BUFFER_BUCKETS> &group_buffers_counts) const noexcept;

private:
SharedMemoryManager() = default;

friend class vk::singleton<SharedMemoryManager>;

size_t memory_limit_{0};
size_t shared_messages_count_{0};
size_t per_process_memory_limit_{0};
size_t shared_messages_count_process_multiplier_{0};
// weights for distributing shared memory between buffers groups
// quantity of i-th memory piece is calculated like (w[i]/sum(w) * memory_limit_) / memory_piece_size
struct shared_memory_buffers_group_info {
size_t buffer_size;
double weight;
};
std::array<shared_memory_buffers_group_info, 1 + JOB_EXTRA_MEMORY_BUFFER_BUCKETS> shared_memory_buffers_groups_{
{
{1 << JOB_SHARED_MESSAGE_SIZE_EXP, 2}, // 128KB messages
// and extra buffers:
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 1), 2}, // 256KB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 2), 2}, // 512KB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 3), 2}, // 1MB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 4), 1}, // 2MB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 5), 1}, // 4MB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 6), 1}, // 8MB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 7), 1}, // 16MB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 8), 1}, // 32MB
{1 << (JOB_SHARED_MESSAGE_SIZE_EXP + 9), 1}, // 64MB
}
};

struct alignas(8) ControlBlock {
ControlBlock() noexcept {
Expand All @@ -115,8 +135,7 @@ class SharedMemoryManager : vk::not_copyable {
std::array<WorkerProcessMeta, WorkersControl::max_workers_count> workers_table{};
freelist_t free_messages{};

// index => (1 << index) MB:
// 0 => 1MB, 1 => 2MB, 2 => 4MB, 3 => 8MB, 4 => 16MB, 5 => 32MB, 6 => 64MB
// 0 => 256KB, 1 => 512KB, 2 => 1MB, 3 => 2MB, 4 => 4MB, 5 => 8MB, 6 => 16MB, 7 => 32MB, 8 => 64MB
std::array<freelist_t, JOB_EXTRA_MEMORY_BUFFER_BUCKETS> free_extra_memory{};
};
ControlBlock *control_block_{nullptr};
Expand Down
Loading

0 comments on commit a9aafd0

Please sign in to comment.