Skip to content

Commit

Permalink
reduce PhpWorker and PhpScript heap allocations (#895)
Browse files Browse the repository at this point in the history
add static storages for PhpWorker and PhpScript
  • Loading branch information
astrophysik authored Oct 17, 2023
1 parent 7fea206 commit 29db917
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 300 deletions.
86 changes: 33 additions & 53 deletions runtime/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "common/algorithms/string-algorithms.h"
#include "common/macos-ports.h"
#include "common/tl/constants/common.h"
#include "common/wrappers/overloaded.h"

#include "net/net-connections.h"
#include "runtime/array_functions.h"
Expand Down Expand Up @@ -402,19 +403,19 @@ void f$setcookie(const string &name, const string &value, int64_t expire, const
}

int64_t f$ignore_user_abort(Optional<bool> enable) {
php_assert(active_worker != nullptr && active_worker->conn != nullptr);
php_assert(php_worker.has_value() && php_worker->conn != nullptr);
if (enable.is_null()) {
return ignore_level;
} else if (enable.val()) {
active_worker->conn->ignored = true;
php_worker->conn->ignored = true;
return ignore_level++;
} else {
int prev = ignore_level > 0 ? ignore_level-- : 0;
if (ignore_level == 0) {
active_worker->conn->ignored = false;
php_worker->conn->ignored = false;
}
if (active_worker->conn->interrupted && !active_worker->conn->ignored) {
active_worker->conn->status = conn_error;
if (php_worker->conn->interrupted && !php_worker->conn->ignored) {
php_worker->conn->status = conn_error;
f$exit(1);
}
return prev;
Expand Down Expand Up @@ -558,13 +559,13 @@ static int ob_merge_buffers() {
}

void f$flush() {
php_assert(ob_cur_buffer >= 0 && active_worker != nullptr);
php_assert(ob_cur_buffer >= 0 && php_worker.has_value());

string_buffer const * http_body = compress_http_query_body(&oub[ob_system_level]);
string_buffer const * http_headers = nullptr;
if (!active_worker->flushed_http_connection) {
if (!php_worker->flushed_http_connection) {
http_headers = get_headers();
active_worker->flushed_http_connection = true;
php_worker->flushed_http_connection = true;
}
http_send_immediate_response(http_headers ? http_headers->buffer() : nullptr, http_headers ? http_headers->size() : 0,
http_body->buffer(), http_body->size());
Expand All @@ -574,7 +575,7 @@ void f$flush() {

void f$fastcgi_finish_request(int64_t exit_code) {
int const ob_total_buffer = ob_merge_buffers();
if (active_worker != nullptr && active_worker->flushed_http_connection) {
if (php_worker.has_value() && php_worker->flushed_http_connection) {
string const raw_response = oub[ob_total_buffer].str();
http_set_result(nullptr, 0, raw_response.c_str(), raw_response.size(), static_cast<int32_t>(exit_code));
php_assert (0);
Expand Down Expand Up @@ -1544,8 +1545,8 @@ static void save_rpc_query_headers(const tl_query_header_t &header) {
}
}

static void init_superglobals(const http_query_data &http_data, const rpc_query_data &rpc_data, const job_query_data &job_data) {
rpc_parse(rpc_data.data, rpc_data.len);
static void init_superglobals_impl(const http_query_data &http_data, const rpc_query_data &rpc_data, const job_query_data &job_data) {
rpc_parse(rpc_data.data.data(), rpc_data.data.size());

reset_superglobals();

Expand Down Expand Up @@ -1735,10 +1736,10 @@ static void init_superglobals(const http_query_data &http_data, const rpc_query_
if (rpc_data.header.qid) {
v$_SERVER.set_value(string("RPC_REQUEST_ID"), f$strval(static_cast<int64_t>(rpc_data.header.qid)));
save_rpc_query_headers(rpc_data.header);
v$_SERVER.set_value(string("RPC_REMOTE_IP"), static_cast<int>(rpc_data.ip));
v$_SERVER.set_value(string("RPC_REMOTE_PORT"), static_cast<int>(rpc_data.port));
v$_SERVER.set_value(string("RPC_REMOTE_PID"), static_cast<int>(rpc_data.pid));
v$_SERVER.set_value(string("RPC_REMOTE_UTIME"), rpc_data.utime);
v$_SERVER.set_value(string("RPC_REMOTE_IP"), static_cast<int>(rpc_data.remote_pid.ip));
v$_SERVER.set_value(string("RPC_REMOTE_PORT"), static_cast<int>(rpc_data.remote_pid.port));
v$_SERVER.set_value(string("RPC_REMOTE_PID"), static_cast<int>(rpc_data.remote_pid.pid));
v$_SERVER.set_value(string("RPC_REMOTE_UTIME"), rpc_data.remote_pid.utime);
}
is_head_query = false;
if (http_data.request_method_len) {
Expand Down Expand Up @@ -1801,47 +1802,26 @@ static http_query_data empty_http_data;
static rpc_query_data empty_rpc_data;
static job_query_data empty_job_data;

void init_superglobals(php_query_data *data) {
http_query_data *http_data;
rpc_query_data *rpc_data;
job_query_data *job_data;
if (data != nullptr) {
if (data->rpc_data != nullptr) {
php_assert (data->http_data == nullptr);
php_assert (data->job_data == nullptr);
void init_superglobals(const php_query_data_t &data) {
// init superglobals depending on the request type
std::visit(overloaded{
[](const rpc_query_data &rpc_data) {
query_type = QUERY_TYPE_RPC;

http_data = &empty_http_data;
rpc_data = data->rpc_data;
job_data = &empty_job_data;
} else if (data->http_data != nullptr) {
php_assert (data->rpc_data == nullptr);
php_assert (data->job_data == nullptr);
init_superglobals_impl(empty_http_data, rpc_data, empty_job_data);
},
[](const http_query_data &http_data) {
query_type = QUERY_TYPE_HTTP;

http_data = data->http_data;
rpc_data = &empty_rpc_data;
job_data = &empty_job_data;
} else {
php_assert (data->job_data != nullptr);
php_assert (data->rpc_data == nullptr);
php_assert (data->http_data == nullptr);

init_superglobals_impl(http_data, empty_rpc_data, empty_job_data);
},
[](const job_query_data &job_data) {
query_type = QUERY_TYPE_JOB;

http_data = &empty_http_data;
rpc_data = &empty_rpc_data;
job_data = data->job_data;
init_superglobals_impl(empty_http_data, empty_rpc_data, job_data);
},
[](const null_query_data &) {
query_type = QUERY_TYPE_CONSOLE;
init_superglobals_impl(empty_http_data, empty_rpc_data, empty_job_data);
}
} else {
query_type = QUERY_TYPE_CONSOLE;

http_data = &empty_http_data;
rpc_data = &empty_rpc_data;
job_data = &empty_job_data;
}

init_superglobals(*http_data, *rpc_data, *job_data);
}, data);
}

double f$get_net_time() {
Expand Down Expand Up @@ -2456,7 +2436,7 @@ void global_init_script_allocator() {
dl::global_init_script_allocator();
}

void init_runtime_environment(php_query_data *data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size) {
void init_runtime_environment(const php_query_data_t &data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size) {
dl::init_script_allocator(mem, script_mem_size, oom_handling_mem_size);
reset_global_interface_vars();
init_runtime_libs();
Expand Down
4 changes: 2 additions & 2 deletions runtime/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ bool f$is_uploaded_file(const string &filename);
bool f$move_uploaded_file(const string &oldname, const string &newname);


void init_superglobals(php_query_data *data);
void init_superglobals(const php_query_data_t &data);


double f$get_net_time();
Expand Down Expand Up @@ -204,7 +204,7 @@ Optional<array<mixed>> f$getopt(const string &options, array<string> longopts =
void global_init_runtime_libs();
void global_init_script_allocator();

void init_runtime_environment(php_query_data *data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size = 0);
void init_runtime_environment(const php_query_data_t &data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size = 0);

void free_runtime_environment();

Expand Down
10 changes: 5 additions & 5 deletions server/job-workers/job-worker-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ int jobs_server_php_wakeup(connection *c) {
double timeout = worker->enter_lifecycle();

if (timeout == 0) {
delete worker;
php_worker.reset();
jobs_server_at_query_end(c);
} else {
assert(c->pending_queries >= 0 && c->status == conn_wait_net);
Expand Down Expand Up @@ -173,10 +173,10 @@ int JobWorkerServer::job_parse_execute(connection *c) noexcept {
job_stat.job_request_max_real_memory_used = job_memory_stats.max_real_memory_used;
job_stat.job_request_max_memory_used = job_memory_stats.max_memory_used;

job_query_data *job_data = job_query_data_create(job, [](JobSharedMessage *job_response) {
return vk::singleton<JobWorkerServer>::get().send_job_reply(job_response);
});
reinterpret_cast<JobCustomData *>(c->custom_data)->worker = new PhpWorker(job_worker, c, nullptr, nullptr, job_data, job->job_id, left_job_time);
php_query_data_t job_data = job_query_data{job, [](JobSharedMessage *job_response) {
return vk::singleton<JobWorkerServer>::get().send_job_reply(job_response);}};
php_worker.emplace(job_worker, c, std::move(job_data), job->job_id, left_job_time);
reinterpret_cast<JobCustomData *>(c->custom_data)->worker = &php_worker.value();

set_connection_timeout(c, left_job_time);
c->status = conn_wait_net;
Expand Down
45 changes: 21 additions & 24 deletions server/php-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ command_t *create_command_net_writer(const char *data, int data_len, command_t *
int run_once_count = 1;
int queries_to_recreate_script = 100;

PhpScript *php_script;
std::optional<PhpScript> php_script;

int has_pending_scripts() {
return php_worker_run_flag || pending_http_queue.first_query != (conn_query *)&pending_http_queue;
Expand Down Expand Up @@ -354,14 +354,14 @@ void on_net_event(int event_status) {
if (event_status == 0) {
return;
}
assert (active_worker != nullptr);
assert (php_worker.has_value());
if (event_status < 0) {
active_worker->terminate(0, script_error_t::net_event_error, "memory limit on net event");
active_worker->wakeup();
php_worker->terminate(0, script_error_t::net_event_error, "memory limit on net event");
php_worker->wakeup();
return;
}
if (active_worker->waiting) {
active_worker->wakeup();
if (php_worker->waiting) {
php_worker->wakeup();
}
}

Expand Down Expand Up @@ -525,7 +525,7 @@ int do_hts_func_wakeup(connection *c, bool flag) {
assert(worker);
double timeout = worker->enter_lifecycle();
if (timeout == 0) {
delete worker;
php_worker.reset();
hts_at_query_end(c, flag);
} else {
assert (timeout > 0);
Expand Down Expand Up @@ -613,14 +613,13 @@ int hts_func_execute(connection *c, int op) {
}

/** save query here **/
http_query_data *http_data = http_query_data_create(qUri, qUriLen, qGet, qGetLen, qHeaders, qHeadersLen, qPost,
qPostLen, query_type_str, D->query_flags & QF_KEEPALIVE,
inet_sockaddr_address(&c->remote_endpoint),
inet_sockaddr_port(&c->remote_endpoint));
php_query_data_t http_data = http_query_data{qUri, qGet, qHeaders, qPost, query_type_str,
qUriLen, qGetLen, qHeadersLen, qPostLen, static_cast<int>(strlen(query_type_str)),
D->query_flags & QF_KEEPALIVE, inet_sockaddr_address(&c->remote_endpoint), inet_sockaddr_port(&c->remote_endpoint)};

static long long http_script_req_id = 0;
PhpWorker *worker = new PhpWorker(http_worker, c, http_data, nullptr, nullptr, ++http_script_req_id, script_timeout);
D->extra = worker;
php_worker.emplace(http_worker, c, std::move(http_data), ++http_script_req_id, script_timeout);
D->extra = &php_worker.value();

set_connection_timeout(c, script_timeout);
c->status = conn_wait_net;
Expand All @@ -640,7 +639,7 @@ int hts_func_close(connection *c, int who __attribute__((unused))) {
double timeout = worker->enter_lifecycle();
D->extra = nullptr;
assert ("worker is unfinished after closing connection" && timeout == 0);
delete worker;
php_worker.reset();
}
return 0;
}
Expand Down Expand Up @@ -825,7 +824,7 @@ int rpcx_func_wakeup(connection *c) {
assert(worker);
double timeout = worker->enter_lifecycle();
if (timeout == 0) {
delete worker;
php_worker.reset();
rpcx_at_query_end(c);
} else {
assert (c->pending_queries >= 0 && c->status == conn_wait_net);
Expand All @@ -844,7 +843,7 @@ int rpcx_func_close(connection *c, int who __attribute__((unused))) {
double timeout = worker->enter_lifecycle();
D->extra = nullptr;
assert ("worker is unfinished after closing connection" && timeout == 0);
delete worker;
php_worker.reset();

if (!has_pending_scripts()) {
lease_set_ready();
Expand Down Expand Up @@ -984,7 +983,7 @@ int rpcx_execute(connection *c, int op, raw_message *raw) {
int64_t left_bytes_without_headers = tl_fetch_unread();

len -= (left_bytes_with_headers - left_bytes_without_headers);
assert(len % 4 == 0);
assert(len % sizeof(int) == 0);

long long req_id = header.qid;

Expand All @@ -1010,19 +1009,17 @@ int rpcx_execute(connection *c, int op, raw_message *raw) {
double actual_script_timeout = custom_settings.has_timeout() ? normalize_script_timeout(custom_settings.php_timeout_ms / 1000.0) : script_timeout;
set_connection_timeout(c, actual_script_timeout);

char buf[len + 1];
auto fetched_bytes = tl_fetch_data(buf, len);
std::vector<int> buffer(len / sizeof(int));
auto fetched_bytes = tl_fetch_data(buffer.data(), len);
if (fetched_bytes == -1) {
client_rpc_error(c, req_id, tl_fetch_error_code(), tl_fetch_error_string());
return 0;
}
assert(fetched_bytes == len);
auto *D = TCP_RPC_DATA(c);
rpc_query_data *rpc_data = rpc_query_data_create(std::move(header), reinterpret_cast<int *>(buf), len / static_cast<int>(sizeof(int)), D->remote_pid.ip,
D->remote_pid.port, D->remote_pid.pid, D->remote_pid.utime);

PhpWorker *worker = new PhpWorker(run_once ? once_worker : rpc_worker, c, nullptr, rpc_data, nullptr, req_id, actual_script_timeout);
D->extra = worker;
php_query_data_t rpc_data = rpc_query_data{std::move(header), std::move(buffer), D->remote_pid};
php_worker.emplace(run_once ? once_worker : rpc_worker, c, std::move(rpc_data), req_id, actual_script_timeout);
D->extra = &php_worker.value();

c->status = conn_wait_net;
rpcx_func_wakeup(c);
Expand Down
5 changes: 4 additions & 1 deletion server/php-engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#pragma once

#include <optional>

#include "common/sanitizer.h"

#include "server/php-queries.h"
Expand Down Expand Up @@ -56,7 +58,8 @@ extern int run_once_count;
extern int queries_to_recreate_script;

class PhpScript;
extern PhpScript *php_script;

extern std::optional<PhpScript> php_script;

void turn_sigterm_on();

Expand Down
10 changes: 5 additions & 5 deletions server/php-queries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,11 @@ void db_run_query(int host_num, const char *request, int request_len, int timeou
}

void http_send_immediate_response(const char *headers, int headers_len, const char *body, int body_len) {
php_assert(active_worker != nullptr);
if (active_worker->mode == http_worker) {
write_out(&active_worker->conn->Out, headers, headers_len);
write_out(&active_worker->conn->Out, body, body_len);
flush_connection_output(active_worker->conn);
php_assert(php_worker.has_value());
if (php_worker->mode == http_worker) {
write_out(&php_worker->conn->Out, headers, headers_len);
write_out(&php_worker->conn->Out, body, body_len);
flush_connection_output(php_worker->conn);
} else {
php_warning("Immediate HTTP response available only from HTTP worker");
}
Expand Down
Loading

0 comments on commit 29db917

Please sign in to comment.