Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce PhpWorker and PhpScript heap allocations #895

Merged
merged 3 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 33 additions & 53 deletions runtime/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "common/algorithms/string-algorithms.h"
#include "common/macos-ports.h"
#include "common/tl/constants/common.h"
#include "common/wrappers/overloaded.h"

#include "net/net-connections.h"
#include "runtime/array_functions.h"
Expand Down Expand Up @@ -402,19 +403,19 @@ void f$setcookie(const string &name, const string &value, int64_t expire, const
}

int64_t f$ignore_user_abort(Optional<bool> enable) {
php_assert(active_worker != nullptr && active_worker->conn != nullptr);
php_assert(php_worker.has_value() && php_worker->conn != nullptr);
if (enable.is_null()) {
return ignore_level;
} else if (enable.val()) {
active_worker->conn->ignored = true;
php_worker->conn->ignored = true;
return ignore_level++;
} else {
int prev = ignore_level > 0 ? ignore_level-- : 0;
if (ignore_level == 0) {
active_worker->conn->ignored = false;
php_worker->conn->ignored = false;
}
if (active_worker->conn->interrupted && !active_worker->conn->ignored) {
active_worker->conn->status = conn_error;
if (php_worker->conn->interrupted && !php_worker->conn->ignored) {
php_worker->conn->status = conn_error;
f$exit(1);
}
return prev;
Expand Down Expand Up @@ -558,13 +559,13 @@ static int ob_merge_buffers() {
}

void f$flush() {
php_assert(ob_cur_buffer >= 0 && active_worker != nullptr);
php_assert(ob_cur_buffer >= 0 && php_worker.has_value());

string_buffer const * http_body = compress_http_query_body(&oub[ob_system_level]);
string_buffer const * http_headers = nullptr;
if (!active_worker->flushed_http_connection) {
if (!php_worker->flushed_http_connection) {
http_headers = get_headers();
active_worker->flushed_http_connection = true;
php_worker->flushed_http_connection = true;
}
http_send_immediate_response(http_headers ? http_headers->buffer() : nullptr, http_headers ? http_headers->size() : 0,
http_body->buffer(), http_body->size());
Expand All @@ -574,7 +575,7 @@ void f$flush() {

void f$fastcgi_finish_request(int64_t exit_code) {
int const ob_total_buffer = ob_merge_buffers();
if (active_worker != nullptr && active_worker->flushed_http_connection) {
if (php_worker.has_value() && php_worker->flushed_http_connection) {
string const raw_response = oub[ob_total_buffer].str();
http_set_result(nullptr, 0, raw_response.c_str(), raw_response.size(), static_cast<int32_t>(exit_code));
php_assert (0);
Expand Down Expand Up @@ -1544,8 +1545,8 @@ static void save_rpc_query_headers(const tl_query_header_t &header) {
}
}

static void init_superglobals(const http_query_data &http_data, const rpc_query_data &rpc_data, const job_query_data &job_data) {
rpc_parse(rpc_data.data, rpc_data.len);
static void init_superglobals_impl(const http_query_data &http_data, const rpc_query_data &rpc_data, const job_query_data &job_data) {
rpc_parse(rpc_data.data.data(), rpc_data.data.size());
DrDet marked this conversation as resolved.
Show resolved Hide resolved

reset_superglobals();

Expand Down Expand Up @@ -1735,10 +1736,10 @@ static void init_superglobals(const http_query_data &http_data, const rpc_query_
if (rpc_data.header.qid) {
v$_SERVER.set_value(string("RPC_REQUEST_ID"), f$strval(static_cast<int64_t>(rpc_data.header.qid)));
save_rpc_query_headers(rpc_data.header);
v$_SERVER.set_value(string("RPC_REMOTE_IP"), static_cast<int>(rpc_data.ip));
v$_SERVER.set_value(string("RPC_REMOTE_PORT"), static_cast<int>(rpc_data.port));
v$_SERVER.set_value(string("RPC_REMOTE_PID"), static_cast<int>(rpc_data.pid));
v$_SERVER.set_value(string("RPC_REMOTE_UTIME"), rpc_data.utime);
v$_SERVER.set_value(string("RPC_REMOTE_IP"), static_cast<int>(rpc_data.remote_pid.ip));
v$_SERVER.set_value(string("RPC_REMOTE_PORT"), static_cast<int>(rpc_data.remote_pid.port));
v$_SERVER.set_value(string("RPC_REMOTE_PID"), static_cast<int>(rpc_data.remote_pid.pid));
v$_SERVER.set_value(string("RPC_REMOTE_UTIME"), rpc_data.remote_pid.utime);
}
is_head_query = false;
if (http_data.request_method_len) {
Expand Down Expand Up @@ -1801,47 +1802,26 @@ static http_query_data empty_http_data;
static rpc_query_data empty_rpc_data;
static job_query_data empty_job_data;

void init_superglobals(php_query_data *data) {
http_query_data *http_data;
rpc_query_data *rpc_data;
job_query_data *job_data;
if (data != nullptr) {
if (data->rpc_data != nullptr) {
php_assert (data->http_data == nullptr);
php_assert (data->job_data == nullptr);
void init_superglobals(const php_query_data_t &data) {
// init superglobals depending on the request type
std::visit(overloaded{
[](const rpc_query_data &rpc_data) {
query_type = QUERY_TYPE_RPC;

http_data = &empty_http_data;
rpc_data = data->rpc_data;
job_data = &empty_job_data;
} else if (data->http_data != nullptr) {
php_assert (data->rpc_data == nullptr);
php_assert (data->job_data == nullptr);
init_superglobals_impl(empty_http_data, rpc_data, empty_job_data);
},
[](const http_query_data &http_data) {
query_type = QUERY_TYPE_HTTP;

http_data = data->http_data;
rpc_data = &empty_rpc_data;
job_data = &empty_job_data;
} else {
php_assert (data->job_data != nullptr);
php_assert (data->rpc_data == nullptr);
php_assert (data->http_data == nullptr);

init_superglobals_impl(http_data, empty_rpc_data, empty_job_data);
},
[](const job_query_data &job_data) {
query_type = QUERY_TYPE_JOB;

http_data = &empty_http_data;
rpc_data = &empty_rpc_data;
job_data = data->job_data;
init_superglobals_impl(empty_http_data, empty_rpc_data, job_data);
},
[](const null_query_data &) {
query_type = QUERY_TYPE_CONSOLE;
init_superglobals_impl(empty_http_data, empty_rpc_data, empty_job_data);
}
} else {
query_type = QUERY_TYPE_CONSOLE;

http_data = &empty_http_data;
rpc_data = &empty_rpc_data;
job_data = &empty_job_data;
}

init_superglobals(*http_data, *rpc_data, *job_data);
}, data);
}

double f$get_net_time() {
Expand Down Expand Up @@ -2456,7 +2436,7 @@ void global_init_script_allocator() {
dl::global_init_script_allocator();
}

void init_runtime_environment(php_query_data *data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size) {
void init_runtime_environment(const php_query_data_t &data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size) {
dl::init_script_allocator(mem, script_mem_size, oom_handling_mem_size);
reset_global_interface_vars();
init_runtime_libs();
Expand Down
4 changes: 2 additions & 2 deletions runtime/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ bool f$is_uploaded_file(const string &filename);
bool f$move_uploaded_file(const string &oldname, const string &newname);


void init_superglobals(php_query_data *data);
void init_superglobals(const php_query_data_t &data);


double f$get_net_time();
Expand Down Expand Up @@ -204,7 +204,7 @@ Optional<array<mixed>> f$getopt(const string &options, array<string> longopts =
void global_init_runtime_libs();
void global_init_script_allocator();

void init_runtime_environment(php_query_data *data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size = 0);
void init_runtime_environment(const php_query_data_t &data, void *mem, size_t script_mem_size, size_t oom_handling_mem_size = 0);

void free_runtime_environment();

Expand Down
10 changes: 5 additions & 5 deletions server/job-workers/job-worker-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ int jobs_server_php_wakeup(connection *c) {
double timeout = worker->enter_lifecycle();

if (timeout == 0) {
delete worker;
php_worker.reset();
jobs_server_at_query_end(c);
} else {
assert(c->pending_queries >= 0 && c->status == conn_wait_net);
Expand Down Expand Up @@ -173,10 +173,10 @@ int JobWorkerServer::job_parse_execute(connection *c) noexcept {
job_stat.job_request_max_real_memory_used = job_memory_stats.max_real_memory_used;
job_stat.job_request_max_memory_used = job_memory_stats.max_memory_used;

job_query_data *job_data = job_query_data_create(job, [](JobSharedMessage *job_response) {
return vk::singleton<JobWorkerServer>::get().send_job_reply(job_response);
});
reinterpret_cast<JobCustomData *>(c->custom_data)->worker = new PhpWorker(job_worker, c, nullptr, nullptr, job_data, job->job_id, left_job_time);
php_query_data_t job_data = job_query_data{job, [](JobSharedMessage *job_response) {
return vk::singleton<JobWorkerServer>::get().send_job_reply(job_response);}};
php_worker.emplace(job_worker, c, std::move(job_data), job->job_id, left_job_time);
reinterpret_cast<JobCustomData *>(c->custom_data)->worker = &php_worker.value();

set_connection_timeout(c, left_job_time);
c->status = conn_wait_net;
Expand Down
45 changes: 21 additions & 24 deletions server/php-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ command_t *create_command_net_writer(const char *data, int data_len, command_t *
int run_once_count = 1;
int queries_to_recreate_script = 100;

PhpScript *php_script;
std::optional<PhpScript> php_script;

int has_pending_scripts() {
return php_worker_run_flag || pending_http_queue.first_query != (conn_query *)&pending_http_queue;
Expand Down Expand Up @@ -354,14 +354,14 @@ void on_net_event(int event_status) {
if (event_status == 0) {
return;
}
assert (active_worker != nullptr);
assert (php_worker.has_value());
if (event_status < 0) {
active_worker->terminate(0, script_error_t::net_event_error, "memory limit on net event");
active_worker->wakeup();
php_worker->terminate(0, script_error_t::net_event_error, "memory limit on net event");
php_worker->wakeup();
return;
}
if (active_worker->waiting) {
active_worker->wakeup();
if (php_worker->waiting) {
php_worker->wakeup();
}
}

Expand Down Expand Up @@ -525,7 +525,7 @@ int do_hts_func_wakeup(connection *c, bool flag) {
assert(worker);
double timeout = worker->enter_lifecycle();
if (timeout == 0) {
delete worker;
php_worker.reset();
hts_at_query_end(c, flag);
} else {
assert (timeout > 0);
Expand Down Expand Up @@ -613,14 +613,13 @@ int hts_func_execute(connection *c, int op) {
}

/** save query here **/
http_query_data *http_data = http_query_data_create(qUri, qUriLen, qGet, qGetLen, qHeaders, qHeadersLen, qPost,
qPostLen, query_type_str, D->query_flags & QF_KEEPALIVE,
inet_sockaddr_address(&c->remote_endpoint),
inet_sockaddr_port(&c->remote_endpoint));
php_query_data_t http_data = http_query_data{qUri, qGet, qHeaders, qPost, query_type_str,
qUriLen, qGetLen, qHeadersLen, qPostLen, static_cast<int>(strlen(query_type_str)),
D->query_flags & QF_KEEPALIVE, inet_sockaddr_address(&c->remote_endpoint), inet_sockaddr_port(&c->remote_endpoint)};

static long long http_script_req_id = 0;
PhpWorker *worker = new PhpWorker(http_worker, c, http_data, nullptr, nullptr, ++http_script_req_id, script_timeout);
D->extra = worker;
php_worker.emplace(http_worker, c, std::move(http_data), ++http_script_req_id, script_timeout);
D->extra = &php_worker.value();

set_connection_timeout(c, script_timeout);
c->status = conn_wait_net;
Expand All @@ -640,7 +639,7 @@ int hts_func_close(connection *c, int who __attribute__((unused))) {
double timeout = worker->enter_lifecycle();
D->extra = nullptr;
assert ("worker is unfinished after closing connection" && timeout == 0);
delete worker;
php_worker.reset();
}
return 0;
}
Expand Down Expand Up @@ -825,7 +824,7 @@ int rpcx_func_wakeup(connection *c) {
assert(worker);
double timeout = worker->enter_lifecycle();
if (timeout == 0) {
delete worker;
php_worker.reset();
rpcx_at_query_end(c);
} else {
assert (c->pending_queries >= 0 && c->status == conn_wait_net);
Expand All @@ -844,7 +843,7 @@ int rpcx_func_close(connection *c, int who __attribute__((unused))) {
double timeout = worker->enter_lifecycle();
D->extra = nullptr;
assert ("worker is unfinished after closing connection" && timeout == 0);
delete worker;
php_worker.reset();

if (!has_pending_scripts()) {
lease_set_ready();
Expand Down Expand Up @@ -984,7 +983,7 @@ int rpcx_execute(connection *c, int op, raw_message *raw) {
int64_t left_bytes_without_headers = tl_fetch_unread();

len -= (left_bytes_with_headers - left_bytes_without_headers);
assert(len % 4 == 0);
assert(len % sizeof(int) == 0);

long long req_id = header.qid;

Expand All @@ -1010,19 +1009,17 @@ int rpcx_execute(connection *c, int op, raw_message *raw) {
double actual_script_timeout = custom_settings.has_timeout() ? normalize_script_timeout(custom_settings.php_timeout_ms / 1000.0) : script_timeout;
set_connection_timeout(c, actual_script_timeout);

char buf[len + 1];
auto fetched_bytes = tl_fetch_data(buf, len);
std::vector<int> buffer(len / sizeof(int));
auto fetched_bytes = tl_fetch_data(buffer.data(), len);
if (fetched_bytes == -1) {
client_rpc_error(c, req_id, tl_fetch_error_code(), tl_fetch_error_string());
return 0;
}
assert(fetched_bytes == len);
auto *D = TCP_RPC_DATA(c);
rpc_query_data *rpc_data = rpc_query_data_create(std::move(header), reinterpret_cast<int *>(buf), len / static_cast<int>(sizeof(int)), D->remote_pid.ip,
D->remote_pid.port, D->remote_pid.pid, D->remote_pid.utime);

PhpWorker *worker = new PhpWorker(run_once ? once_worker : rpc_worker, c, nullptr, rpc_data, nullptr, req_id, actual_script_timeout);
D->extra = worker;
php_query_data_t rpc_data = rpc_query_data{std::move(header), std::move(buffer), D->remote_pid};
php_worker.emplace(run_once ? once_worker : rpc_worker, c, std::move(rpc_data), req_id, actual_script_timeout);
D->extra = &php_worker.value();

c->status = conn_wait_net;
rpcx_func_wakeup(c);
Expand Down
5 changes: 4 additions & 1 deletion server/php-engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#pragma once

#include <optional>

#include "common/sanitizer.h"

#include "server/php-queries.h"
Expand Down Expand Up @@ -56,7 +58,8 @@ extern int run_once_count;
extern int queries_to_recreate_script;

class PhpScript;
extern PhpScript *php_script;

extern std::optional<PhpScript> php_script;

void turn_sigterm_on();

Expand Down
10 changes: 5 additions & 5 deletions server/php-queries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,11 @@ void db_run_query(int host_num, const char *request, int request_len, int timeou
}

void http_send_immediate_response(const char *headers, int headers_len, const char *body, int body_len) {
php_assert(active_worker != nullptr);
if (active_worker->mode == http_worker) {
write_out(&active_worker->conn->Out, headers, headers_len);
write_out(&active_worker->conn->Out, body, body_len);
flush_connection_output(active_worker->conn);
php_assert(php_worker.has_value());
if (php_worker->mode == http_worker) {
write_out(&php_worker->conn->Out, headers, headers_len);
write_out(&php_worker->conn->Out, body, body_len);
flush_connection_output(php_worker->conn);
} else {
php_warning("Immediate HTTP response available only from HTTP worker");
}
Expand Down
Loading