Skip to content

Commit

Permalink
change to optional storages
Browse files Browse the repository at this point in the history
  • Loading branch information
Vadim Sadokhov committed Sep 12, 2023
1 parent 6008466 commit c468fd7
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 38 deletions.
5 changes: 3 additions & 2 deletions server/job-workers/job-worker-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ int jobs_server_php_wakeup(connection *c) {
double timeout = worker->enter_lifecycle();

if (timeout == 0) {
worker->~PhpWorker();
php_worker_storage.reset();
jobs_server_at_query_end(c);
} else {
assert(c->pending_queries >= 0 && c->status == conn_wait_net);
Expand Down Expand Up @@ -175,7 +175,8 @@ int JobWorkerServer::job_parse_execute(connection *c) noexcept {

php_query_data_t job_data = job_query_data{job, [](JobSharedMessage *job_response) {
return vk::singleton<JobWorkerServer>::get().send_job_reply(job_response);}};
reinterpret_cast<JobCustomData *>(c->custom_data)->worker = new (&php_worker_storage) PhpWorker(job_worker, c, std::move(job_data), job->job_id, left_job_time);
php_worker_storage.emplace(job_worker, c, std::move(job_data), job->job_id, left_job_time);
reinterpret_cast<JobCustomData *>(c->custom_data)->worker = &php_worker_storage.value();

set_connection_timeout(c, left_job_time);
c->status = conn_wait_net;
Expand Down
18 changes: 9 additions & 9 deletions server/php-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ command_t *create_command_net_writer(const char *data, int data_len, command_t *
int run_once_count = 1;
int queries_to_recreate_script = 100;

PhpScript *php_script;
std::optional<PhpScript> php_script;

int has_pending_scripts() {
return php_worker_run_flag || pending_http_queue.first_query != (conn_query *)&pending_http_queue;
Expand Down Expand Up @@ -526,7 +526,7 @@ int do_hts_func_wakeup(connection *c, bool flag) {
assert(worker);
double timeout = worker->enter_lifecycle();
if (timeout == 0) {
worker->~PhpWorker();
php_worker_storage.reset();
hts_at_query_end(c, flag);
} else {
assert (timeout > 0);
Expand Down Expand Up @@ -619,8 +619,8 @@ int hts_func_execute(connection *c, int op) {
D->query_flags & QF_KEEPALIVE, inet_sockaddr_address(&c->remote_endpoint), inet_sockaddr_port(&c->remote_endpoint)};

static long long http_script_req_id = 0;
PhpWorker *worker = new (&php_worker_storage) PhpWorker(http_worker, c, std::move(http_data), ++http_script_req_id, script_timeout);
D->extra = worker;
php_worker_storage.emplace(http_worker, c, std::move(http_data), ++http_script_req_id, script_timeout);
D->extra = &php_worker_storage.value();

set_connection_timeout(c, script_timeout);
c->status = conn_wait_net;
Expand All @@ -640,7 +640,7 @@ int hts_func_close(connection *c, int who __attribute__((unused))) {
double timeout = worker->enter_lifecycle();
D->extra = nullptr;
assert ("worker is unfinished after closing connection" && timeout == 0);
worker->~PhpWorker();
php_worker_storage.reset();
}
return 0;
}
Expand Down Expand Up @@ -825,7 +825,7 @@ int rpcx_func_wakeup(connection *c) {
assert(worker);
double timeout = worker->enter_lifecycle();
if (timeout == 0) {
worker->~PhpWorker();
php_worker_storage.reset();
rpcx_at_query_end(c);
} else {
assert (c->pending_queries >= 0 && c->status == conn_wait_net);
Expand All @@ -844,7 +844,7 @@ int rpcx_func_close(connection *c, int who __attribute__((unused))) {
double timeout = worker->enter_lifecycle();
D->extra = nullptr;
assert ("worker is unfinished after closing connection" && timeout == 0);
worker->~PhpWorker();
php_worker_storage.reset();

if (!has_pending_scripts()) {
lease_set_ready();
Expand Down Expand Up @@ -1021,8 +1021,8 @@ int rpcx_execute(connection *c, int op, raw_message *raw) {
php_query_data_t rpc_data = rpc_query_data{std::move(header), reinterpret_cast<int *>(buffer), len / static_cast<int>(sizeof(int)),
D->remote_pid.ip, D->remote_pid.port, D->remote_pid.pid, D->remote_pid.utime};

PhpWorker *worker = new (&php_worker_storage) PhpWorker(run_once ? once_worker : rpc_worker, c, std::move(rpc_data), req_id, actual_script_timeout);
D->extra = worker;
php_worker_storage.emplace(run_once ? once_worker : rpc_worker, c, std::move(rpc_data), req_id, actual_script_timeout);
D->extra = &php_worker_storage.value();

c->status = conn_wait_net;
rpcx_func_wakeup(c);
Expand Down
5 changes: 4 additions & 1 deletion server/php-engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#pragma once

#include <optional>

#include "common/sanitizer.h"

#include "server/php-queries.h"
Expand Down Expand Up @@ -56,7 +58,8 @@ extern int run_once_count;
extern int queries_to_recreate_script;

class PhpScript;
extern PhpScript *php_script;

extern std::optional<PhpScript> php_script;

void turn_sigterm_on();

Expand Down
2 changes: 0 additions & 2 deletions server/php-runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ DEFINE_VERBOSITY(php_runner);
query_stats_t query_stats;
long long query_stats_id = 1;

alignas(PhpScript) std::byte php_script_storage[sizeof(PhpScript)];

namespace {
//TODO: sometimes I need to call old handlers
//TODO: recheck!
Expand Down
3 changes: 1 addition & 2 deletions server/php-runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <csetjmp>
#include <optional>

#include "common/dl-utils-lite.h"
#include "common/mixin/not_copyable.h"
Expand Down Expand Up @@ -156,5 +157,3 @@ class PhpScript {
void disable_timeout() noexcept;
void set_timeout(double t) noexcept;
};

alignas(PhpScript) extern std::byte php_script_storage[sizeof(PhpScript)];
41 changes: 20 additions & 21 deletions server/php-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "server/php-worker.h"
#include "server/server-stats.h"

alignas(PhpWorker) std::byte php_worker_storage[sizeof(PhpWorker)];
std::optional<PhpWorker> php_worker_storage;

PhpWorker *active_worker = nullptr;

Expand All @@ -36,7 +36,7 @@ double PhpWorker::enter_lifecycle() noexcept {
on_wakeup();

tvkprintf(php_runner, 3, "PHP-worker enter lifecycle [php-script state = %d, conn status = %d] lifecycle [req_id = %016llx]\n",
php_script ? static_cast<int>(php_script->state) : -1, conn->status, req_id);
php_script.value(). static_cast<int>(php_script.value().state) : -1, conn->status, req_id);
paused = false;
do {
switch (state) {
Expand Down Expand Up @@ -70,7 +70,7 @@ double PhpWorker::enter_lifecycle() noexcept {
} while (!paused);

tvkprintf(php_runner, 3, "PHP-worker [php-script state = %d, conn status = %d] return in net reactor [req_id = %016llx]\n",
php_script ? static_cast<int>(php_script->state) : -1, conn->status, req_id);
php_script.value(). static_cast<int>(php_script.value().state) : -1, conn->status, req_id);
assert(conn->status == conn_wait_net);
return get_timeout();
}
Expand All @@ -90,7 +90,7 @@ void PhpWorker::on_wakeup() noexcept {
if (wakeup_flag || (wakeup_time != 0 && wakeup_time <= precise_now)) {
waiting = 0;
wakeup_time = 0;
// php_script_query_answered (php_script);
// php_script_query_answered (php_script.value().
}
}
wakeup_flag = 0;
Expand Down Expand Up @@ -157,13 +157,13 @@ void PhpWorker::state_init_script() noexcept {

script_t *script = get_script();
dl_assert(script != nullptr, "failed to get script");
if (php_script == nullptr) {
php_script = new (&php_script_storage) PhpScript(max_memory, oom_handling_memory_ratio, 8 << 20);
if (!php_script.has_value()) {
php_script.emplace(max_memory, oom_handling_memory_ratio, 8 << 20);
}
dl::init_critical_section();
php_script->init(script, data);
php_script.value().init(script, data);
data = null_query_data{};
php_script->set_timeout(timeout);
php_script.value().set_timeout(timeout);
state = phpq_run;
}

Expand Down Expand Up @@ -220,11 +220,11 @@ void PhpWorker::state_run() noexcept {
while (f) {
if (terminate_flag) {
tvkprintf(php_runner, 1, "PHP-worker terminate PHP-script [req_id = %016llx]\n", req_id);
php_script->terminate(error_message, terminate_reason);
php_script.value().terminate(error_message, terminate_reason);
}

// fprintf (stderr, "state = %d, f = %d\n", php_script_get_state (php_script), f);
switch (php_script->state) {
// fprintf (stderr, "state = %d, f = %d\n", php_script_get_state (php_script.value(). f);
switch (php_script.value().state) {
case run_state_t::ready: {
vk::singleton<ServerStats>::get().set_running_worker_status();
if (waiting) {
Expand All @@ -236,7 +236,7 @@ void PhpWorker::state_run() noexcept {
break;
}
tvkprintf(php_runner, 3, "PHP-worker before swap context [req_id = %016llx]\n", req_id);
php_script->iterate();
php_script.value().iterate();
tvkprintf(php_runner, 3, "PHP-worker after swap context [req_id = %016llx]\n", req_id);;
wait(0); // check for net events
break;
Expand Down Expand Up @@ -269,7 +269,7 @@ void PhpWorker::state_run() noexcept {
// in case the error happened when malloc was replaced
dl::rollback_malloc_replacement();
}
php_script->finish();
php_script.value().finish();

if (conn != nullptr) {
switch (mode) {
Expand All @@ -280,12 +280,12 @@ void PhpWorker::state_run() noexcept {
break;
case rpc_worker:
if (!rpc_stored) {
server_rpc_error(conn, req_id, -504, php_script->error_message);
server_rpc_error(conn, req_id, -504, php_script.value().error_message);
}
break;
case job_worker: {
const char *error = php_script->error_message;
int error_code = job_workers::server_php_script_error_offset - static_cast<int>(php_script->error_type);
const char *error = php_script.value().error_message;
int error_code = job_workers::server_php_script_error_offset - static_cast<int>(php_script.value().error_type);
auto &job_server = vk::singleton<job_workers::JobWorkerServer>::get();
if (job_server.reply_is_expected()) {
job_server.store_job_response_error(error, error_code);
Expand All @@ -302,9 +302,9 @@ void PhpWorker::state_run() noexcept {
}
case run_state_t::finished: {
tvkprintf(php_runner, 1, "PHP-worker finish PHP-script [req_id = %016llx]\n", req_id);
script_result *res = php_script->res;
script_result *res = php_script.value().res;
set_result(res);
php_script->finish();
php_script.value().finish();

state = phpq_free_script;
f = 0;
Expand Down Expand Up @@ -420,9 +420,8 @@ void PhpWorker::state_free_script() noexcept {

static int finished_queries = 0;
if ((++finished_queries) % queries_to_recreate_script == 0
|| (!use_madvise_dontneed && php_script->memory_get_total_usage() > memory_used_to_recreate_script)) {
php_script->~PhpScript();
php_script = nullptr;
|| (!use_madvise_dontneed && php_script.value().memory_get_total_usage() > memory_used_to_recreate_script)) {
php_script.reset();
finished_queries = 0;
}

Expand Down
2 changes: 1 addition & 1 deletion server/php-worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ class PhpWorker {
void state_finish() noexcept;
};

alignas(PhpWorker) extern std::byte php_worker_storage[sizeof(PhpWorker)];
extern std::optional<PhpWorker> php_worker_storage;

extern PhpWorker *active_worker;

0 comments on commit c468fd7

Please sign in to comment.