From e8da89fb528f7c6d22aeb1834b423dc0e4f8566b Mon Sep 17 00:00:00 2001 From: Alexander Polyakov Date: Thu, 12 Sep 2024 16:11:23 +0300 Subject: [PATCH] Add job workers support to K2 runtime (#1097) --- builtin-functions/kphp-light/functions.txt | 3 +- builtin-functions/kphp-light/job-workers.txt | 49 ++++++ .../kphp-light/unsupported-functions.txt | 1 - .../kphp-light/unsupported/job-worker.txt | 52 ------ .../class-instance/refcountable-php-classes.h | 9 +- runtime-light/component/component.cmake | 2 +- runtime-light/component/component.cpp | 42 +---- runtime-light/component/component.h | 7 + runtime-light/component/image.h | 1 - runtime-light/component/init-functions.cpp | 69 ++++++++ runtime-light/component/init-functions.h | 16 ++ runtime-light/stdlib/array/array-functions.h | 15 +- runtime-light/stdlib/fork/fork-context.h | 2 +- runtime-light/stdlib/fork/fork-functions.h | 8 +- .../stdlib/job-worker/job-worker-api.cpp | 150 ++++++++++++++++++ .../stdlib/job-worker/job-worker-api.h | 36 +++++ .../stdlib/job-worker/job-worker-context.cpp | 16 ++ .../stdlib/job-worker/job-worker-context.h | 29 ++++ runtime-light/stdlib/job-worker/job-worker.h | 87 ++++++++++ runtime-light/stdlib/rpc/rpc-api.h | 6 +- runtime-light/stdlib/stdlib.cmake | 37 +++-- runtime-light/tl/tl-core.h | 21 ++- runtime-light/tl/tl-functions.cpp | 48 ++++++ runtime-light/tl/tl-functions.h | 29 ++++ runtime-light/tl/tl-types.cpp | 41 +++++ runtime-light/tl/tl-types.h | 25 +++ runtime-light/tl/tl.cmake | 6 +- tests/k2-components/test_job_worker.php | 12 ++ tests/k2-components/test_job_worker_multi.php | 18 +++ .../k2-components/test_job_worker_noreply.php | 11 ++ .../3_mutable_shared_memory_piece.php | 2 +- ..._not_single_shared_memory_piece_member.php | 2 +- 32 files changed, 707 insertions(+), 145 deletions(-) create mode 100644 builtin-functions/kphp-light/job-workers.txt delete mode 100644 builtin-functions/kphp-light/unsupported/job-worker.txt create mode 100644 runtime-light/component/init-functions.cpp create mode 100644 runtime-light/component/init-functions.h create mode 100644 runtime-light/stdlib/job-worker/job-worker-api.cpp create mode 100644 runtime-light/stdlib/job-worker/job-worker-api.h create mode 100644 runtime-light/stdlib/job-worker/job-worker-context.cpp create mode 100644 runtime-light/stdlib/job-worker/job-worker-context.h create mode 100644 runtime-light/stdlib/job-worker/job-worker.h create mode 100644 runtime-light/tl/tl-functions.cpp create mode 100644 runtime-light/tl/tl-functions.h create mode 100644 runtime-light/tl/tl-types.cpp create mode 100644 runtime-light/tl/tl-types.h create mode 100644 tests/k2-components/test_job_worker.php create mode 100644 tests/k2-components/test_job_worker_multi.php create mode 100644 tests/k2-components/test_job_worker_noreply.php diff --git a/builtin-functions/kphp-light/functions.txt b/builtin-functions/kphp-light/functions.txt index 82a04abe61..7b7e75f2e8 100644 --- a/builtin-functions/kphp-light/functions.txt +++ b/builtin-functions/kphp-light/functions.txt @@ -1,5 +1,7 @@ | false; + +/** @kphp-extern-func-info interruptible */ +function kphp_job_worker_start_no_reply(string $request, float $timeout): bool; + +/** @kphp-extern-func-info interruptible */ +function kphp_job_worker_start_multi(string[] $request, float $timeout): (future | false)[]; + +/** @kphp-extern-func-info interruptible */ +function kphp_job_worker_fetch_request(): string; + +/** @kphp-extern-func-info interruptible */ +function kphp_job_worker_store_response(string $response): int; + +function is_kphp_job_workers_enabled(): bool; + +function get_job_workers_number(): int; + diff --git a/builtin-functions/kphp-light/unsupported-functions.txt b/builtin-functions/kphp-light/unsupported-functions.txt index 021f080ebc..bc71685454 100644 --- a/builtin-functions/kphp-light/unsupported-functions.txt +++ b/builtin-functions/kphp-light/unsupported-functions.txt @@ -7,7 +7,6 @@ require_once __DIR__ . '/unsupported/error.txt'; require_once __DIR__ . '/unsupported/file.txt'; require_once __DIR__ . '/unsupported/fork.txt'; require_once __DIR__ . '/unsupported/hash.txt'; -require_once __DIR__ . '/unsupported/job-worker.txt'; require_once __DIR__ . '/unsupported/kml.txt'; require_once __DIR__ . '/unsupported/kphp-toggles.txt'; require_once __DIR__ . '/unsupported/kphp-tracing.txt'; diff --git a/builtin-functions/kphp-light/unsupported/job-worker.txt b/builtin-functions/kphp-light/unsupported/job-worker.txt deleted file mode 100644 index 3b370da193..0000000000 --- a/builtin-functions/kphp-light/unsupported/job-worker.txt +++ /dev/null @@ -1,52 +0,0 @@ - | false; -/** @kphp-extern-func-info generate-stub */ -function kphp_job_worker_start_no_reply(KphpJobWorkerRequest $request, float $timeout) ::: bool; -/** @kphp-extern-func-info generate-stub */ -function kphp_job_worker_start_multi(KphpJobWorkerRequest[] $request, float $timeout) ::: (future | false)[]; -/** @kphp-extern-func-info generate-stub */ -function kphp_job_worker_store_response(KphpJobWorkerResponse $response) ::: int; - diff --git a/runtime-core/class-instance/refcountable-php-classes.h b/runtime-core/class-instance/refcountable-php-classes.h index 92db4d16a7..04c54f164c 100644 --- a/runtime-core/class-instance/refcountable-php-classes.h +++ b/runtime-core/class-instance/refcountable-php-classes.h @@ -21,7 +21,7 @@ class abstract_refcountable_php_interface : public ScriptAllocatorManaged { virtual void *get_instance_data_raw_ptr() noexcept = 0; }; -template +template class refcountable_polymorphic_php_classes : public Bases... { public: void add_ref() noexcept final { @@ -55,7 +55,7 @@ class refcountable_polymorphic_php_classes : public Bases... { uint32_t refcnt{0}; }; -template +template class refcountable_polymorphic_php_classes_virt : public virtual abstract_refcountable_php_interface, public Interfaces... { public: refcountable_polymorphic_php_classes_virt() __attribute__((always_inline)) = default; @@ -98,7 +98,7 @@ class refcountable_polymorphic_php_classes_virt<> : public virtual abstract_refc }; template -class refcountable_php_classes : public ScriptAllocatorManaged { +class refcountable_php_classes : public ScriptAllocatorManaged { public: void add_ref() noexcept { if (refcnt < ExtraRefCnt::for_global_const) { @@ -133,6 +133,7 @@ class refcountable_php_classes : public ScriptAllocatorManaged { void *get_instance_data_raw_ptr() noexcept { return this; } + private: uint32_t refcnt{0}; }; @@ -144,6 +145,6 @@ class refcountable_empty_php_classes { }; struct may_be_mixed_base : public virtual abstract_refcountable_php_interface { - virtual ~may_be_mixed_base() = default; + ~may_be_mixed_base() override = default; virtual const char *get_class() const noexcept = 0; }; diff --git a/runtime-light/component/component.cmake b/runtime-light/component/component.cmake index e67a81992e..01c9c5ede4 100644 --- a/runtime-light/component/component.cmake +++ b/runtime-light/component/component.cmake @@ -1 +1 @@ -prepend(RUNTIME_COMPONENT_SRC component/ component.cpp) +prepend(RUNTIME_COMPONENT_SRC component/ component.cpp init-functions.cpp) diff --git a/runtime-light/component/component.cpp b/runtime-light/component/component.cpp index 161c8146ec..6a58fcbe4d 100644 --- a/runtime-light/component/component.cpp +++ b/runtime-light/component/component.cpp @@ -10,49 +10,19 @@ #include #include "runtime-core/utils/kphp-assert-core.h" +#include "runtime-light/component/init-functions.h" #include "runtime-light/core/globals/php-init-scripts.h" -#include "runtime-light/coroutine/awaitable.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/header.h" #include "runtime-light/scheduler/scheduler.h" +#include "runtime-light/stdlib/job-worker/job-worker-context.h" #include "runtime-light/streams/streams.h" #include "runtime-light/utils/context.h" -#include "runtime-light/utils/json-functions.h" namespace { -constexpr uint32_t K2_INVOKE_HTTP_MAGIC = 0xd909efe8; -constexpr uint32_t K2_INVOKE_JOB_WORKER_MAGIC = 0x437d7312; - -void init_http_superglobals(const string &http_query) noexcept { +int32_t merge_output_buffers() noexcept { auto &component_ctx{*get_component_context()}; - component_ctx.php_script_mutable_globals_singleton.get_superglobals().v$_SERVER.set_value(string{"QUERY_TYPE"}, string{"http"}); - component_ctx.php_script_mutable_globals_singleton.get_superglobals().v$_POST = f$json_decode(http_query, true); -} - -task_t init_kphp_cli_component() noexcept { - co_return co_await wait_for_incoming_stream_t{}; -} - -task_t init_kphp_server_component() noexcept { - uint32_t magic{}; - const auto stream_d{co_await wait_for_incoming_stream_t{}}; - const auto read{co_await read_exact_from_stream(stream_d, reinterpret_cast(std::addressof(magic)), sizeof(uint32_t))}; - php_assert(read == sizeof(uint32_t)); - if (magic == K2_INVOKE_HTTP_MAGIC) { - const auto [buffer, size]{co_await read_all_from_stream(stream_d)}; - init_http_superglobals(string{buffer, static_cast(size)}); - get_platform_context()->allocator.free(buffer); - } else if (magic == K2_INVOKE_JOB_WORKER_MAGIC) { - php_error("not implemented"); - } else { - php_error("server got unexpected type of request: 0x%x", magic); - } - - co_return stream_d; -} - -int32_t merge_output_buffers(ComponentState &component_ctx) noexcept { Response &response{component_ctx.response}; php_assert(response.current_buffer >= 0); @@ -95,12 +65,16 @@ task_t ComponentState::run_component_epilogue() noexcept { if (component_kind_ == ComponentKind::Oneshot || component_kind_ == ComponentKind::Multishot) { co_return; } + // do not flush output buffers if we are in job worker + if (JobWorkerServerComponentContext::get().kind != JobWorkerServerComponentContext::Kind::Invalid) { + co_return; + } if (standard_stream() == INVALID_PLATFORM_DESCRIPTOR) { poll_status = PollStatus::PollFinishedError; co_return; } - const auto &buffer{response.output_buffers[merge_output_buffers(*this)]}; + const auto &buffer{response.output_buffers[merge_output_buffers()]}; if ((co_await write_all_to_stream(standard_stream(), buffer.buffer(), buffer.size())) != buffer.size()) { php_warning("can't write component result to stream %" PRIu64, standard_stream()); } diff --git a/runtime-light/component/component.h b/runtime-light/component/component.h index b3c48246dc..27cfb1cff3 100644 --- a/runtime-light/component/component.h +++ b/runtime-light/component/component.h @@ -17,6 +17,7 @@ #include "runtime-light/header.h" #include "runtime-light/scheduler/scheduler.h" #include "runtime-light/stdlib/fork/fork-context.h" +#include "runtime-light/stdlib/job-worker/job-worker-context.h" #include "runtime-light/stdlib/output/output-buffer.h" #include "runtime-light/stdlib/regex/regex-functions.h" #include "runtime-light/stdlib/curl/curl.h" @@ -63,6 +64,10 @@ struct ComponentState { task_t run_component_epilogue() noexcept; + ComponentKind component_kind() const noexcept { + return component_kind_; + } + void process_platform_updates() noexcept; bool stream_updated(uint64_t stream_d) const noexcept { @@ -94,6 +99,8 @@ struct ComponentState { KphpCoreContext kphp_core_context; RpcComponentContext rpc_component_context; + JobWorkerClientComponentContext job_worker_client_component_context{}; + JobWorkerServerComponentContext job_worker_server_component_context{}; RegexComponentState regex_component_context; CurlComponentState curl_component_state; diff --git a/runtime-light/component/image.h b/runtime-light/component/image.h index 40025b9c40..0dc98eb2b5 100644 --- a/runtime-light/component/image.h +++ b/runtime-light/component/image.h @@ -4,7 +4,6 @@ #pragma once -#include "runtime-light/header.h" #include "runtime-light/stdlib/rpc/rpc-context.h" struct ImageState { diff --git a/runtime-light/component/init-functions.cpp b/runtime-light/component/init-functions.cpp new file mode 100644 index 0000000000..e85b9982c6 --- /dev/null +++ b/runtime-light/component/init-functions.cpp @@ -0,0 +1,69 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/component/init-functions.h" + +#include + +#include "runtime-core/runtime-core.h" +#include "runtime-core/utils/kphp-assert-core.h" +#include "runtime-light/component/component.h" +#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/header.h" +#include "runtime-light/stdlib/job-worker/job-worker-context.h" +#include "runtime-light/streams/streams.h" +#include "runtime-light/tl/tl-core.h" +#include "runtime-light/tl/tl-functions.h" +#include "runtime-light/utils/context.h" + +namespace { + +void process_k2_invoke_job_worker(tl::TLBuffer &tlb) noexcept { + tl::K2InvokeJobWorker invoke_jw{}; + if (!invoke_jw.fetch(tlb)) { + php_error("erroneous job worker request"); + } + php_assert(invoke_jw.image_id == vk_k2_describe()->build_timestamp); // ensure that we got the request from ourselves + + auto &jw_server_ctx{JobWorkerServerComponentContext::get()}; + jw_server_ctx.kind = invoke_jw.ignore_answer ? JobWorkerServerComponentContext::Kind::NoReply : JobWorkerServerComponentContext::Kind::Regular; + jw_server_ctx.state = JobWorkerServerComponentContext::State::Working; + jw_server_ctx.job_id = invoke_jw.job_id; + jw_server_ctx.body = std::move(invoke_jw.body); + get_component_context()->php_script_mutable_globals_singleton.get_superglobals().v$_SERVER.set_value(string{"JOB_ID"}, invoke_jw.job_id); +} + +void process_k2_invoke_http([[maybe_unused]] tl::TLBuffer &tlb) noexcept {} + +} // namespace + +task_t init_kphp_server_component() noexcept { + auto stream_d{co_await wait_for_incoming_stream_t{}}; + const auto [buffer, size]{co_await read_all_from_stream(stream_d)}; + php_assert(size >= sizeof(uint32_t)); // check that we can fetch at least magic + tl::TLBuffer tlb{}; + tlb.store_bytes(buffer, static_cast(size)); + get_platform_context()->allocator.free(buffer); + + switch (const auto magic{*reinterpret_cast(tlb.data())}) { // lookup magic + case tl::K2_INVOKE_HTTP_MAGIC: { + process_k2_invoke_http(tlb); + break; + } + case tl::K2_INVOKE_JOB_WORKER_MAGIC: { + process_k2_invoke_job_worker(tlb); + // release standard stream in case of a no reply job worker since we don't need that stream anymore + if (JobWorkerServerComponentContext::get().kind == JobWorkerServerComponentContext::Kind::NoReply) { + get_component_context()->release_stream(stream_d); + stream_d = INVALID_PLATFORM_DESCRIPTOR; + } + break; + } + default: { + php_error("unexpected magic: 0x%x", magic); + } + } + co_return stream_d; +} diff --git a/runtime-light/component/init-functions.h b/runtime-light/component/init-functions.h new file mode 100644 index 0000000000..f320fd906b --- /dev/null +++ b/runtime-light/component/init-functions.h @@ -0,0 +1,16 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/coroutine/task.h" + +// Returns a stream descriptor that is supposed to be a stream to stdout +inline task_t init_kphp_cli_component() noexcept { + co_return co_await wait_for_incoming_stream_t{}; +} + +// Performs some initialization and returns a stream descriptor we need to write server response into +task_t init_kphp_server_component() noexcept; diff --git a/runtime-light/stdlib/array/array-functions.h b/runtime-light/stdlib/array/array-functions.h index 77e4d4a29d..65cd664eeb 100644 --- a/runtime-light/stdlib/array/array-functions.h +++ b/runtime-light/stdlib/array/array-functions.h @@ -6,9 +6,9 @@ #include "runtime-core/runtime-core.h" -constexpr int64_t SORT_REGULAR = 0; -constexpr int64_t SORT_NUMERIC = 1; -constexpr int64_t SORT_STRING = 2; +inline constexpr int64_t SORT_REGULAR = 0; +inline constexpr int64_t SORT_NUMERIC = 1; +inline constexpr int64_t SORT_STRING = 2; template string f$implode(const string &s, const array &a) { @@ -60,13 +60,11 @@ array f$array_filter_by_key(const array &a, const T1 &callback) noexcept { php_critical_error("call to unsupported function"); } - template, T>> array f$array_map(const CallbackT &callback, const array &a) { php_critical_error("call to unsupported function"); } - template R f$array_reduce(const array &a, const CallbackT &callback, InitialT initial) { php_critical_error("call to unsupported function"); @@ -104,7 +102,7 @@ T f$array_merge(const T &a1, const T &a2, const T &a3, const T &a4 = T(), const php_critical_error("call to unsupported function"); } -template +template ReturnT f$array_merge_recursive(const Args &...args) { php_critical_error("call to unsupported function"); } @@ -446,7 +444,6 @@ void f$array_swap_int_keys(array &a, int64_t idx1, int64_t idx2) noexcept { php_critical_error("call to unsupported function"); } - template array f$to_array_debug(const class_instance &klass, bool with_class_names = false) { php_critical_error("call to unsupported function"); @@ -517,7 +514,7 @@ inline Optional> f$array_column(const array &a, const mixed } template -auto f$array_column(const Optional &a, const mixed &column_key, const mixed &index_key = {}) -> decltype(f$array_column(std::declval(), column_key, index_key)) { +auto f$array_column(const Optional &a, const mixed &column_key, + const mixed &index_key = {}) -> decltype(f$array_column(std::declval(), column_key, index_key)) { php_critical_error("call to unsupported function"); } - diff --git a/runtime-light/stdlib/fork/fork-context.h b/runtime-light/stdlib/fork/fork-context.h index 153e34fdd0..abe33ae4d3 100644 --- a/runtime-light/stdlib/fork/fork-context.h +++ b/runtime-light/stdlib/fork/fork-context.h @@ -12,7 +12,7 @@ #include "runtime-light/coroutine/task.h" #include "runtime-light/utils/concepts.h" -constexpr int64_t INVALID_FORK_ID = -1; +inline constexpr int64_t INVALID_FORK_ID = -1; class ForkComponentContext { template diff --git a/runtime-light/stdlib/fork/fork-functions.h b/runtime-light/stdlib/fork/fork-functions.h index 6ee65fa110..5e98a6d6cb 100644 --- a/runtime-light/stdlib/fork/fork-functions.h +++ b/runtime-light/stdlib/fork/fork-functions.h @@ -17,10 +17,10 @@ namespace fork_api_impl_ { -constexpr double MAX_TIMEOUT_S = 86400.0; -constexpr double DEFAULT_TIMEOUT_S = MAX_TIMEOUT_S; -constexpr auto MAX_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{MAX_TIMEOUT_S}); -constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{DEFAULT_TIMEOUT_S}); +inline constexpr double MAX_TIMEOUT_S = 86400.0; +inline constexpr double DEFAULT_TIMEOUT_S = MAX_TIMEOUT_S; +inline constexpr auto MAX_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{MAX_TIMEOUT_S}); +inline constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{DEFAULT_TIMEOUT_S}); } // namespace fork_api_impl_ diff --git a/runtime-light/stdlib/job-worker/job-worker-api.cpp b/runtime-light/stdlib/job-worker/job-worker-api.cpp new file mode 100644 index 0000000000..6fee43a6dc --- /dev/null +++ b/runtime-light/stdlib/job-worker/job-worker-api.cpp @@ -0,0 +1,150 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/job-worker/job-worker-api.h" + +#include +#include +#include +#include +#include + +#include "runtime-core/runtime-core.h" +#include "runtime-core/utils/kphp-assert-core.h" +#include "runtime-light/component/component.h" +#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/header.h" +#include "runtime-light/stdlib/component/component-api.h" +#include "runtime-light/stdlib/fork/fork-context.h" +#include "runtime-light/stdlib/job-worker/job-worker-context.h" +#include "runtime-light/stdlib/job-worker/job-worker.h" +#include "runtime-light/streams/streams.h" +#include "runtime-light/tl/tl-core.h" +#include "runtime-light/tl/tl-functions.h" +#include "runtime-light/tl/tl-types.h" +#include "runtime-light/utils/context.h" + +namespace { + +constexpr const char *JOB_WORKER_COMPONENT_NAME = "_self"; + +constexpr double MIN_TIMEOUT_S = 0.05; +constexpr double MAX_TIMEOUT_S = 86400.0; + +task_t kphp_job_worker_start_impl(string request, double timeout, bool ignore_answer) noexcept { + if (!f$is_kphp_job_workers_enabled()) { + php_warning("can't start job worker: job workers are disabled"); + co_return INVALID_FORK_ID; + } + if (request.empty()) { + php_warning("job worker request is empty"); + co_return INVALID_FORK_ID; + } + + auto &jw_client_ctx{JobWorkerClientComponentContext::get()}; + // normalize timeout + const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration(std::clamp(timeout, MIN_TIMEOUT_S, MAX_TIMEOUT_S)))}; + // prepare JW component request + tl::TLBuffer tlb{}; + const tl::K2InvokeJobWorker invoke_jw{.image_id = vk_k2_describe()->build_timestamp, + .job_id = jw_client_ctx.current_job_id++, + .ignore_answer = ignore_answer, + .timeout_ns = static_cast(timeout_ns.count()), + .body = std::move(request)}; + invoke_jw.store(tlb); + + // send JW request + auto comp_query{co_await f$component_client_send_request(string{JOB_WORKER_COMPONENT_NAME}, string{tlb.data(), static_cast(tlb.size())})}; + if (comp_query.is_null()) { + php_warning("couldn't start job worker"); + co_return INVALID_FORK_ID; + } + // create fork to wait for job worker response. we need to do it even if 'ignore_answer' is 'true' to make sure + // that the stream will not be closed too early. otherwise, platform may even not send job worker request + auto waiter_task{[](auto comp_query, std::chrono::nanoseconds timeout) noexcept -> task_t { + auto fetch_task{f$component_client_fetch_response(std::move(comp_query))}; + const string response{(co_await wait_with_timeout_t{task_t::awaiter_t{std::addressof(fetch_task)}, timeout}).value_or(string{})}; + + tl::TLBuffer tlb{}; + tlb.store_bytes(response.c_str(), static_cast(response.size())); + tl::K2JobWorkerResponse jw_response{}; + if (!jw_response.fetch(tlb)) { + co_return string{}; + } + co_return std::move(jw_response.body); + }(std::move(comp_query), timeout_ns)}; + // start waiter fork and return its ID + co_return(co_await start_fork_t{static_cast>(std::move(waiter_task)), start_fork_t::execution::self}); +} + +} // namespace + +// ================================================================================================ + +task_t> f$kphp_job_worker_start(string request, double timeout) noexcept { + const auto fork_id{co_await kphp_job_worker_start_impl(std::move(request), timeout, false)}; + co_return fork_id != INVALID_FORK_ID ? fork_id : false; +} + +task_t f$kphp_job_worker_start_no_reply(string request, double timeout) noexcept { + const auto fork_id{co_await kphp_job_worker_start_impl(std::move(request), timeout, true)}; + co_return fork_id != INVALID_FORK_ID; +} + +task_t>> f$kphp_job_worker_start_multi(array requests, double timeout) noexcept { + array> fork_ids{requests.size()}; + for (const auto &it : requests) { + const auto fork_id{co_await kphp_job_worker_start_impl(it.get_value(), timeout, false)}; + fork_ids.set_value(it.get_key(), fork_id != INVALID_FORK_ID ? fork_id : false); + } + co_return fork_ids; +} + +// ================================================================================================ + +task_t f$kphp_job_worker_fetch_request() noexcept { + if (!f$is_kphp_job_workers_enabled()) { + php_warning("couldn't fetch job worker request: job workers are disabled"); + co_return string{}; + } + + auto &jw_server_ctx{JobWorkerServerComponentContext::get()}; + if (jw_server_ctx.job_id == JOB_WORKER_INVALID_JOB_ID || jw_server_ctx.body.empty()) { + php_warning("couldn't fetch job worker request"); + co_return string{}; + } + co_return std::exchange(jw_server_ctx.body, string{}); +} + +task_t f$kphp_job_worker_store_response(string response) noexcept { + auto &component_ctx{*get_component_context()}; + auto &jw_server_ctx{JobWorkerServerComponentContext::get()}; + if (!f$is_kphp_job_workers_enabled()) { // workers are enabled + php_warning("couldn't store job worker response: job workers are disabled"); + co_return static_cast(JobWorkerError::store_response_incorrect_call_error); + } else if (jw_server_ctx.kind != JobWorkerServerComponentContext::Kind::Regular) { // we're in regular worker + php_warning("couldn't store job worker response: we are either in no reply job worker or not in a job worker at all"); + co_return static_cast(JobWorkerError::store_response_incorrect_call_error); + } else if (jw_server_ctx.state == JobWorkerServerComponentContext::State::Replied) { // it's the first attempt to reply + php_warning("couldn't store job worker response: multiple stores are forbidden"); + co_return static_cast(JobWorkerError::store_response_incorrect_call_error); + } else if (component_ctx.standard_stream() == INVALID_PLATFORM_DESCRIPTOR) { // we have a stream to write into + php_warning("couldn't store job worker response: no standard stream"); + co_return static_cast(JobWorkerError::store_response_incorrect_call_error); + } else if (response.empty()) { // we have a response to reply + php_warning("couldn't store job worker response: it shouldn't be empty"); + co_return static_cast(JobWorkerError::store_response_incorrect_call_error); + } + + tl::TLBuffer tlb{}; + tl::K2JobWorkerResponse jw_response{.job_id = jw_server_ctx.job_id, .body = std::move(response)}; + jw_response.store(tlb); + if ((co_await write_all_to_stream(component_ctx.standard_stream(), tlb.data(), tlb.size())) != tlb.size()) { + php_warning("couldn't store job worker response"); + co_return static_cast(JobWorkerError::store_response_cant_send_error); + } + jw_server_ctx.state = JobWorkerServerComponentContext::State::Replied; + co_return 0; +} diff --git a/runtime-light/stdlib/job-worker/job-worker-api.h b/runtime-light/stdlib/job-worker/job-worker-api.h new file mode 100644 index 0000000000..47b21973c6 --- /dev/null +++ b/runtime-light/stdlib/job-worker/job-worker-api.h @@ -0,0 +1,36 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include + +#include "runtime-core/runtime-core.h" +#include "runtime-light/component/component.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/utils/context.h" + +// === Client ===================================================================================== + +task_t> f$kphp_job_worker_start(string request, double timeout) noexcept; + +task_t f$kphp_job_worker_start_no_reply(string request, double timeout) noexcept; + +task_t>> f$kphp_job_worker_start_multi(array requests, double timeout) noexcept; + +// === Server ===================================================================================== + +task_t f$kphp_job_worker_fetch_request() noexcept; + +task_t f$kphp_job_worker_store_response(string response) noexcept; + +// === Misc ======================================================================================= + +inline bool f$is_kphp_job_workers_enabled() noexcept { + return get_component_context()->component_kind() == ComponentKind::Server; +} + +inline int64_t f$get_job_workers_number() noexcept { + return 50; // TODO +} diff --git a/runtime-light/stdlib/job-worker/job-worker-context.cpp b/runtime-light/stdlib/job-worker/job-worker-context.cpp new file mode 100644 index 0000000000..35b3db42d6 --- /dev/null +++ b/runtime-light/stdlib/job-worker/job-worker-context.cpp @@ -0,0 +1,16 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/job-worker/job-worker-context.h" + +#include "runtime-light/component/component.h" +#include "runtime-light/utils/context.h" + +JobWorkerServerComponentContext &JobWorkerServerComponentContext::get() noexcept { + return get_component_context()->job_worker_server_component_context; +} + +JobWorkerClientComponentContext &JobWorkerClientComponentContext::get() noexcept { + return get_component_context()->job_worker_client_component_context; +} diff --git a/runtime-light/stdlib/job-worker/job-worker-context.h b/runtime-light/stdlib/job-worker/job-worker-context.h new file mode 100644 index 0000000000..f0f27aa79c --- /dev/null +++ b/runtime-light/stdlib/job-worker/job-worker-context.h @@ -0,0 +1,29 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include + +#include "common/mixin/not_copyable.h" +#include "runtime-core/runtime-core.h" +#include "runtime-light/stdlib/job-worker/job-worker.h" + +struct JobWorkerServerComponentContext final : private vk::not_copyable { + enum class Kind : uint8_t { Invalid, Regular, NoReply }; + enum class State : uint8_t { Invalid, Working, Replied }; + + Kind kind{Kind::Invalid}; + State state{State::Invalid}; + int64_t job_id{JOB_WORKER_INVALID_JOB_ID}; + string body; + + static JobWorkerServerComponentContext &get() noexcept; +}; + +struct JobWorkerClientComponentContext final : private vk::not_copyable { + int64_t current_job_id{JOB_WORKER_VALID_JOB_ID_RANGE_START}; + + static JobWorkerClientComponentContext &get() noexcept; +}; diff --git a/runtime-light/stdlib/job-worker/job-worker.h b/runtime-light/stdlib/job-worker/job-worker.h new file mode 100644 index 0000000000..e356d71132 --- /dev/null +++ b/runtime-light/stdlib/job-worker/job-worker.h @@ -0,0 +1,87 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include + +#include "runtime-core/class-instance/refcountable-php-classes.h" +#include "runtime-core/runtime-core.h" + +inline constexpr int64_t JOB_WORKER_VALID_JOB_ID_RANGE_START = 0; +inline constexpr int64_t JOB_WORKER_INVALID_JOB_ID = -1; + +namespace job_worker_impl_ { + +struct SendableBase : virtual abstract_refcountable_php_interface { + virtual const char *get_class() const noexcept = 0; + virtual int32_t get_hash() const noexcept = 0; + virtual size_t virtual_builtin_sizeof() const noexcept = 0; + virtual SendableBase *virtual_builtin_clone() const noexcept = 0; + + ~SendableBase() override = default; +}; + +} // namespace job_worker_impl_ + +enum class JobWorkerError : int16_t { + store_response_incorrect_call_error = -3000, + store_response_cant_send_error = -3003, +}; + +// === KphpJobWorkerSharedMemoryPiece ============================================================= + +struct C$KphpJobWorkerSharedMemoryPiece : public job_worker_impl_::SendableBase { + C$KphpJobWorkerSharedMemoryPiece *virtual_builtin_clone() const noexcept override = 0; +}; + +// === KphpJobWorkerRequest ======================================================================= + +struct C$KphpJobWorkerRequest : public job_worker_impl_::SendableBase { + C$KphpJobWorkerRequest *virtual_builtin_clone() const noexcept override = 0; +}; + +// === KphpJobWorkerResponse ====================================================================== + +struct C$KphpJobWorkerResponse : public job_worker_impl_::SendableBase { + C$KphpJobWorkerResponse *virtual_builtin_clone() const noexcept override = 0; +}; + +// === KphpJobWorkerResponseError ================================================================= + +struct C$KphpJobWorkerResponseError : public refcountable_polymorphic_php_classes { + string error; + int64_t error_code; + + const char *get_class() const noexcept override { + return "KphpJobWorkerResponseError"; + } + + int32_t get_hash() const noexcept override { + return static_cast(std::hash{}(get_class())); + } + + size_t virtual_builtin_sizeof() const noexcept override { + return sizeof(*this); + } + + C$KphpJobWorkerResponseError *virtual_builtin_clone() const noexcept override { + return new C$KphpJobWorkerResponseError{*this}; + } +}; + +inline class_instance f$KphpJobWorkerResponseError$$__construct(class_instance v$this) noexcept { + return v$this; +} + +inline string f$KphpJobWorkerResponseError$$getError(class_instance v$this) noexcept { + return v$this.get()->error; +} + +inline int64_t f$KphpJobWorkerResponseError$$getErrorCode(class_instance v$this) noexcept { + return v$this.get()->error_code; +} diff --git a/runtime-light/stdlib/rpc/rpc-api.h b/runtime-light/stdlib/rpc/rpc-api.h index 22e2f3b3aa..1fedd4208a 100644 --- a/runtime-light/stdlib/rpc/rpc-api.h +++ b/runtime-light/stdlib/rpc/rpc-api.h @@ -14,9 +14,9 @@ #include "runtime-light/stdlib/rpc/rpc-tl-function.h" #include "runtime-light/stdlib/rpc/rpc-tl-kphp-request.h" -constexpr int64_t RPC_VALID_QUERY_ID_RANGE_START = 0; -constexpr int64_t RPC_INVALID_QUERY_ID = -1; -constexpr int64_t RPC_IGNORED_ANSWER_QUERY_ID = -2; +inline constexpr int64_t RPC_VALID_QUERY_ID_RANGE_START = 0; +inline constexpr int64_t RPC_INVALID_QUERY_ID = -1; +inline constexpr int64_t RPC_IGNORED_ANSWER_QUERY_ID = -2; namespace rpc_impl_ { diff --git a/runtime-light/stdlib/stdlib.cmake b/runtime-light/stdlib/stdlib.cmake index 32caac39d2..abefe33df5 100644 --- a/runtime-light/stdlib/stdlib.cmake +++ b/runtime-light/stdlib/stdlib.cmake @@ -1,18 +1,21 @@ prepend( - RUNTIME_STDLIB_SRC - stdlib/ - component/component-api.cpp - curl/curl.cpp - exit/exit-functions.cpp - fork/fork-context.cpp - output/output-buffer.cpp - output/print-functions.cpp - rpc/rpc-api.cpp - rpc/rpc-context.cpp - rpc/rpc-extra-headers.cpp - rpc/rpc-extra-info.cpp - rpc/rpc-tl-error.cpp - rpc/rpc-tl-query.cpp - rpc/rpc-tl-request.cpp - string/concat.cpp - regex/regex-functions.cpp) + RUNTIME_STDLIB_SRC + stdlib/ + component/component-api.cpp + crypto/crypto-functions.cpp + curl/curl.cpp + exit/exit-functions.cpp + fork/fork-context.cpp + job-worker/job-worker-api.cpp + job-worker/job-worker-context.cpp + output/output-buffer.cpp + output/print-functions.cpp + regex/regex-functions.cpp + rpc/rpc-api.cpp + rpc/rpc-context.cpp + rpc/rpc-extra-headers.cpp + rpc/rpc-extra-info.cpp + rpc/rpc-tl-error.cpp + rpc/rpc-tl-query.cpp + rpc/rpc-tl-request.cpp + string/concat.cpp) diff --git a/runtime-light/tl/tl-core.h b/runtime-light/tl/tl-core.h index ba179db59c..68467eff10 100644 --- a/runtime-light/tl/tl-core.h +++ b/runtime-light/tl/tl-core.h @@ -7,7 +7,6 @@ #include #include #include -#include #include "common/mixin/not_copyable.h" #include "runtime-core/runtime-core.h" @@ -15,16 +14,17 @@ #include "runtime-light/utils/concepts.h" namespace tl { -constexpr auto SMALL_STRING_SIZE_LEN = 1; -constexpr auto MEDIUM_STRING_SIZE_LEN = 3; -constexpr auto LARGE_STRING_SIZE_LEN = 7; -constexpr uint64_t SMALL_STRING_MAX_LEN = 253; -constexpr uint64_t MEDIUM_STRING_MAX_LEN = (static_cast(1) << 24) - 1; -[[maybe_unused]] constexpr uint64_t LARGE_STRING_MAX_LEN = (static_cast(1) << 56) - 1; +inline constexpr auto SMALL_STRING_SIZE_LEN = 1; +inline constexpr auto MEDIUM_STRING_SIZE_LEN = 3; +inline constexpr auto LARGE_STRING_SIZE_LEN = 7; -constexpr uint8_t LARGE_STRING_MAGIC = 0xff; -constexpr uint8_t MEDIUM_STRING_MAGIC = 0xfe; +inline constexpr uint64_t SMALL_STRING_MAX_LEN = 253; +inline constexpr uint64_t MEDIUM_STRING_MAX_LEN = (static_cast(1) << 24) - 1; +[[maybe_unused]] inline constexpr uint64_t LARGE_STRING_MAX_LEN = (static_cast(1) << 56) - 1; + +inline constexpr uint8_t LARGE_STRING_MAGIC = 0xff; +inline constexpr uint8_t MEDIUM_STRING_MAGIC = 0xfe; class TLBuffer : private vk::not_copyable { string_buffer m_buffer; @@ -74,8 +74,7 @@ class TLBuffer : private vk::not_copyable { } template - requires std::convertible_to - void store_trivial(const U &t) noexcept { + requires std::convertible_to void store_trivial(const U &t) noexcept { // Here we rely on that endianness of architecture is Little Endian store_bytes(reinterpret_cast(std::addressof(t)), sizeof(T)); } diff --git a/runtime-light/tl/tl-functions.cpp b/runtime-light/tl/tl-functions.cpp new file mode 100644 index 0000000000..f283afcda1 --- /dev/null +++ b/runtime-light/tl/tl-functions.cpp @@ -0,0 +1,48 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/tl/tl-functions.h" + +#include +#include + +#include "runtime-light/tl/tl-core.h" + +namespace { + +// magic + flags + image_id + job_id + minimum string size length +constexpr auto K2_INVOKE_JW_MIN_SIZE = sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(int64_t) + sizeof(uint64_t) + tl::SMALL_STRING_SIZE_LEN; + +constexpr auto K2_JOB_WORKER_IGNORE_ANSWER_FLAG = static_cast(1U << 0U); + +} // namespace + +namespace tl { + +bool K2InvokeJobWorker::fetch(TLBuffer &tlb) noexcept { + if (tlb.size() < K2_INVOKE_JW_MIN_SIZE || *tlb.fetch_trivial() != K2_INVOKE_JOB_WORKER_MAGIC) { + return false; + } + + const auto flags{*tlb.fetch_trivial()}; + ignore_answer = static_cast(flags & K2_JOB_WORKER_IGNORE_ANSWER_FLAG); + image_id = *tlb.fetch_trivial(); + job_id = *tlb.fetch_trivial(); + timeout_ns = *tlb.fetch_trivial(); + const std::string_view body_view{tlb.fetch_string()}; + body = string{body_view.data(), static_cast(body_view.size())}; + return true; +} + +void K2InvokeJobWorker::store(TLBuffer &tlb) const noexcept { + const uint32_t flags{ignore_answer ? K2_JOB_WORKER_IGNORE_ANSWER_FLAG : 0x0}; + tlb.store_trivial(K2_INVOKE_JOB_WORKER_MAGIC); + tlb.store_trivial(flags); + tlb.store_trivial(image_id); + tlb.store_trivial(job_id); + tlb.store_trivial(timeout_ns); + tlb.store_string({body.c_str(), body.size()}); +} + +} // namespace tl diff --git a/runtime-light/tl/tl-functions.h b/runtime-light/tl/tl-functions.h new file mode 100644 index 0000000000..8510875603 --- /dev/null +++ b/runtime-light/tl/tl-functions.h @@ -0,0 +1,29 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include + +#include "runtime-core/runtime-core.h" +#include "runtime-light/tl/tl-core.h" + +namespace tl { + +inline constexpr uint32_t K2_INVOKE_HTTP_MAGIC = 0xd909efe8; +inline constexpr uint32_t K2_INVOKE_JOB_WORKER_MAGIC = 0x437d7312; + +struct K2InvokeJobWorker final { + uint64_t image_id{}; + int64_t job_id{}; + bool ignore_answer{}; + uint64_t timeout_ns{}; + string body; + + bool fetch(TLBuffer &tlb) noexcept; + + void store(TLBuffer &tlb) const noexcept; +}; + +} // namespace tl diff --git a/runtime-light/tl/tl-types.cpp b/runtime-light/tl/tl-types.cpp new file mode 100644 index 0000000000..b742cf816a --- /dev/null +++ b/runtime-light/tl/tl-types.cpp @@ -0,0 +1,41 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/tl/tl-types.h" + +#include +#include +#include + +#include "runtime-light/tl/tl-core.h" + +namespace { + +// magic + flags + job_id + minimum string size length +constexpr auto K2_JOB_WORKER_RESPONSE_MIN_SIZE = sizeof(uint32_t) + sizeof(uint32_t) + sizeof(int64_t) + tl::SMALL_STRING_SIZE_LEN; + +} // namespace + +namespace tl { + +bool K2JobWorkerResponse::fetch(TLBuffer &tlb) noexcept { + if (tlb.size() < K2_JOB_WORKER_RESPONSE_MIN_SIZE || *tlb.fetch_trivial() != K2_JOB_WORKER_RESPONSE_MAGIC) { + return false; + } + + std::ignore = tlb.fetch_trivial(); // ignore flags + job_id = *tlb.fetch_trivial(); + const std::string_view body_view{tlb.fetch_string()}; + body = string{body_view.data(), static_cast(body_view.size())}; + return true; +} + +void K2JobWorkerResponse::store(TLBuffer &tlb) const noexcept { + tlb.store_trivial(K2_JOB_WORKER_RESPONSE_MAGIC); + tlb.store_trivial(0x0); // flags + tlb.store_trivial(job_id); + tlb.store_string({body.c_str(), body.size()}); +} + +} // namespace tl diff --git a/runtime-light/tl/tl-types.h b/runtime-light/tl/tl-types.h new file mode 100644 index 0000000000..8f08359de7 --- /dev/null +++ b/runtime-light/tl/tl-types.h @@ -0,0 +1,25 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include + +#include "runtime-core/runtime-core.h" +#include "runtime-light/tl/tl-core.h" + +namespace tl { + +inline constexpr uint32_t K2_JOB_WORKER_RESPONSE_MAGIC = 0x3afb3a08; + +struct K2JobWorkerResponse final { + int64_t job_id{}; + string body; + + bool fetch(TLBuffer &tlb) noexcept; + + void store(TLBuffer &tlb) const noexcept; +}; + +} // namespace tl diff --git a/runtime-light/tl/tl.cmake b/runtime-light/tl/tl.cmake index 59a7ae7772..248ed8a18a 100644 --- a/runtime-light/tl/tl.cmake +++ b/runtime-light/tl/tl.cmake @@ -1,4 +1,2 @@ -prepend(RUNTIME_TL_SRC tl/ - tl-builtins.cpp - tl-core.cpp -) +prepend(RUNTIME_TL_SRC tl/ tl-builtins.cpp tl-core.cpp tl-functions.cpp + tl-types.cpp) diff --git a/tests/k2-components/test_job_worker.php b/tests/k2-components/test_job_worker.php new file mode 100644 index 0000000000..2aab3314e9 --- /dev/null +++ b/tests/k2-components/test_job_worker.php @@ -0,0 +1,12 @@ +