Skip to content

Commit

Permalink
Support type erasure on task_t (#1074)
Browse files Browse the repository at this point in the history
We use type erasure to store forks' tasks in a ForkComponentContext
  • Loading branch information
apolyakov authored Aug 22, 2024
1 parent a5ae8f4 commit bd7874b
Show file tree
Hide file tree
Showing 33 changed files with 81 additions and 113 deletions.
2 changes: 1 addition & 1 deletion builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ function wait(future<any> | false $id, float $timeout = -1.0) ::: ^1[*] | null;
function sched_yield() ::: void;

/** @kphp-extern-func-info interruptible */
function sched_yield_sleep($timeout_ns ::: int) ::: void;
function sched_yield_sleep($duration ::: float) ::: void;

// === Rpc ========================================================================================

Expand Down
4 changes: 1 addition & 3 deletions compiler/code-gen/declarations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ void FunctionDeclaration::compile(CodeGenerator &W) const {
switch (style) {
case gen_out_style::tagger:
case gen_out_style::cpp: {
if (function->is_k2_fork) {
FunctionSignatureGenerator(W) << "task_t<fork_result> " << FunctionName(function) << "(" << params_gen << ")";
} else if (function->is_interruptible) {
if (function->is_interruptible) {
FunctionSignatureGenerator(W) << "task_t<" << ret_type_gen << ">" << " " << FunctionName(function) << "(" << params_gen << ")";
} else {
FunctionSignatureGenerator(W) << ret_type_gen << " " << FunctionName(function) << "(" << params_gen << ")";
Expand Down
6 changes: 2 additions & 4 deletions compiler/code-gen/vertex-compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ void compile_func_call(VertexAdaptor<op_func_call> root, CodeGenerator &W, func_

if (mode == func_call_mode::fork_call) {
if (func->is_interruptible) {
W << "(co_await start_fork_t{" << FunctionName(func);
W << "(co_await start_fork_t{static_cast<task_t<void>>(" << FunctionName(func);
} else {
W << FunctionForkName(func);
}
Expand Down Expand Up @@ -883,9 +883,7 @@ void compile_func_call(VertexAdaptor<op_func_call> root, CodeGenerator &W, func_
W << ")";
if (func->is_interruptible) {
if (mode == func_call_mode::fork_call) {
W << ", start_fork_t::execution::fork})";
} else if (func->is_k2_fork) { // k2 fork's return type is 'task_t<fork_result>' so we need to unpack actual result from fork_result
W << ").get_result<" << TypeName(tinf::get_type(root)) << ">()";
W << "), start_fork_t::execution::fork})";
} else {
W << ")";
}
Expand Down
2 changes: 1 addition & 1 deletion runtime-core/utils/small-object-storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <type_traits>
#include <utility>

#include "runtime-core/runtime-core.h"
#include "runtime-core/allocator/runtime-allocator.h"

template<size_t limit>
union small_object_storage {
Expand Down
5 changes: 2 additions & 3 deletions runtime-light/allocator/runtime-light-allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
#include <cstddef>
#include <cstring>

#include "runtime-core/runtime-core.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/utils/panic.h"

namespace {
// TODO: make it depend on max chunk size, e.g. MIN_EXTRA_MEM_SIZE = f(MAX_CHUNK_SIZE);
Expand Down Expand Up @@ -136,4 +135,4 @@ void *RuntimeAllocator::realloc_global_memory(void *mem, size_t new_size, size_t

void RuntimeAllocator::free_global_memory(void *mem, size_t) noexcept {
get_platform_context()->allocator.free(mem);
}
}
20 changes: 13 additions & 7 deletions runtime-light/coroutine/awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "runtime-light/header.h"
#include "runtime-light/scheduler/scheduler.h"
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/context.h"

template<class T>
Expand Down Expand Up @@ -277,7 +276,7 @@ class start_fork_t {
SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}};

public:
explicit start_fork_t(task_t<fork_result> &&task_, execution exec_policy_) noexcept
explicit start_fork_t(task_t<void> task_, execution exec_policy_) noexcept
: exec_policy(exec_policy_)
, fork_coro(task_.get_handle())
, fork_id(ForkComponentContext::get().push_fork(std::move(task_))) {}
Expand Down Expand Up @@ -328,13 +327,16 @@ class start_fork_t {
template<typename T>
class wait_fork_t {
int64_t fork_id;
task_t<fork_result> fork_task;
task_t<fork_result>::awaiter_t fork_awaiter;
task_t<T> fork_task;
task_t<T>::awaiter_t fork_awaiter;

using fork_resume_t = decltype(fork_awaiter.await_resume());
using await_resume_t = fork_resume_t;

public:
explicit wait_fork_t(int64_t fork_id_) noexcept
: fork_id(fork_id_)
, fork_task(ForkComponentContext::get().pop_fork(fork_id))
, fork_task(static_cast<task_t<T>>(ForkComponentContext::get().pop_fork(fork_id)))
, fork_awaiter(std::addressof(fork_task)) {}

wait_fork_t(wait_fork_t &&other) noexcept
Expand All @@ -355,8 +357,12 @@ class wait_fork_t {
fork_awaiter.await_suspend(coro);
}

T await_resume() noexcept {
return fork_awaiter.await_resume().get_result<T>();
await_resume_t await_resume() noexcept {
if constexpr (std::is_void_v<await_resume_t>) {
fork_awaiter.await_resume();
} else {
return fork_awaiter.await_resume();
}
}

constexpr bool resumable() const noexcept {
Expand Down
14 changes: 13 additions & 1 deletion runtime-light/coroutine/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <cassert>
#include <concepts>
#include <coroutine>
#include <optional>
#include <utility>
Expand Down Expand Up @@ -62,7 +63,6 @@ struct task_t : public task_base_t {
struct promise_void_t;

using promise_type = std::conditional_t<!std::is_void<T>{}, promise_non_void_t<T>, promise_void_t>;

using task_base_t::task_base_t;

struct promise_base_t {
Expand Down Expand Up @@ -225,4 +225,16 @@ struct task_t : public task_base_t {
std::coroutine_handle<promise_type> get_handle() {
return std::coroutine_handle<promise_type>::from_address(handle_address);
}

// conversion functions
//
// erase type
explicit operator task_t<void>() && noexcept {
return task_t<void>{std::coroutine_handle<>::from_address(std::exchange(handle_address, nullptr))};
}
// restore erased type
template<typename U>
requires(std::same_as<void, T>) explicit operator task_t<U>() && noexcept {
return task_t<U>{std::coroutine_handle<>::from_address(std::exchange(handle_address, nullptr))};
}
};
9 changes: 4 additions & 5 deletions runtime-light/stdlib/fork/fork-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "runtime-light/stdlib/fork/fork-api.h"

#include <chrono>
#include <cstdint>

#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/awaitable.h"
Expand All @@ -15,10 +14,10 @@ task_t<void> f$sched_yield() noexcept {
co_await wait_for_reschedule_t{};
}

task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept {
if (duration_ns < 0) {
php_warning("can't sleep for negative duration %" PRId64, duration_ns);
task_t<void> f$sched_yield_sleep(double duration) noexcept {
if (duration <= 0) {
php_warning("can't sleep for negative or zero duration %.9f", duration);
co_return;
}
co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast<uint64_t>(duration_ns)}};
co_await wait_for_timer_t{std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{duration})};
}
8 changes: 5 additions & 3 deletions runtime-light/stdlib/fork/fork-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
#pragma once

#include <chrono>
#include <concepts>
#include <cstdint>

#include "runtime-core/core-types/decl/optional.h"
#include "runtime-core/runtime-core.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
Expand All @@ -23,7 +25,7 @@ constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast<std::chrono::nano
} // namespace fork_api_impl_

template<typename T>
requires(is_optional<T>::value) task_t<T> f$wait(int64_t fork_id, double timeout = -1.0) noexcept {
requires(is_optional<T>::value || std::same_as<T, mixed>) task_t<T> f$wait(int64_t fork_id, double timeout = -1.0) noexcept {
auto &fork_ctx{ForkComponentContext::get()};
if (!fork_ctx.contains(fork_id)) {
php_warning("can't find fork %" PRId64, fork_id);
Expand All @@ -38,10 +40,10 @@ requires(is_optional<T>::value) task_t<T> f$wait(int64_t fork_id, double timeout
}

template<typename T>
requires(is_optional<T>::value) task_t<T> f$wait(Optional<int64_t> fork_id_opt, double timeout = -1.0) noexcept {
requires(is_optional<T>::value || std::same_as<T, mixed>) task_t<T> f$wait(Optional<int64_t> fork_id_opt, double timeout = -1.0) noexcept {
co_return co_await f$wait<T>(fork_id_opt.has_value() ? fork_id_opt.val() : INVALID_FORK_ID, timeout);
}

task_t<void> f$sched_yield() noexcept;

task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept;
task_t<void> f$sched_yield_sleep(double duration) noexcept;
11 changes: 5 additions & 6 deletions runtime-light/stdlib/fork/fork-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "runtime-core/memory-resource/unsynchronized_pool_resource.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/concepts.h"

constexpr int64_t INVALID_FORK_ID = -1;
Expand All @@ -20,15 +19,15 @@ class ForkComponentContext {
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

static constexpr auto FORK_ID_INIT = 0;

unordered_map<int64_t, task_t<fork_result>> forks;
// type erased tasks that represent forks
unordered_map<int64_t, task_t<void>> forks;
int64_t next_fork_id{FORK_ID_INIT + 1};

int64_t push_fork(task_t<fork_result> &&task) noexcept {
int64_t push_fork(task_t<void> task) noexcept {
return forks.emplace(next_fork_id, std::move(task)), next_fork_id++;
}

task_t<fork_result> pop_fork(int64_t fork_id) noexcept {
task_t<void> pop_fork(int64_t fork_id) noexcept {
const auto it_fork{forks.find(fork_id)};
if (it_fork == forks.end()) {
php_critical_error("can't find fork %" PRId64, fork_id);
Expand All @@ -44,7 +43,7 @@ class ForkComponentContext {

public:
explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: forks(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource}) {}
: forks(unordered_map<int64_t, task_t<void>>::allocator_type{memory_resource}) {}

static ForkComponentContext &get() noexcept;

Expand Down
27 changes: 0 additions & 27 deletions runtime-light/stdlib/fork/fork.h

This file was deleted.

4 changes: 2 additions & 2 deletions runtime-light/stdlib/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

#include <cstdint>

#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/header.h"
#include "runtime-light/stdlib/superglobals.h"
#include "runtime-light/streams/streams.h"
#include "runtime-light/utils/context.h"
#include "runtime-light/utils/panic.h"

namespace {

Expand Down Expand Up @@ -60,7 +60,7 @@ task_t<void> finish(int64_t exit_code, bool from_exit) { // TODO: use exit code
const auto ob_total_buffer = ob_merge_buffers();
Response &response = component_ctx.response;
auto &buffer = response.output_buffers[ob_total_buffer];
if (co_await write_all_to_stream(standard_stream, buffer.c_str(), buffer.size())) {
if ((co_await write_all_to_stream(standard_stream, buffer.c_str(), buffer.size())) != buffer.size()) {
php_warning("can't write component result to stream %" PRIu64, standard_stream);
}
component_ctx.release_all_streams();
Expand Down
4 changes: 2 additions & 2 deletions runtime-light/stdlib/rpc/rpc-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ task_t<RpcQueryInfo> rpc_send_impl(string actor, double timeout, bool ignore_ans
: DEFAULT_TIMEOUT_NS};
// create fork to wait for RPC response. we need to do it even if 'ignore_answer' is 'true' to make sure
// that the stream will not be closed too early. otherwise, platform may even not send RPC request
auto waiter_task{[](int64_t query_id, auto comp_query, std::chrono::nanoseconds timeout, bool collect_responses_extra_info) noexcept -> task_t<fork_result> {
auto waiter_task{[](int64_t query_id, auto comp_query, std::chrono::nanoseconds timeout, bool collect_responses_extra_info) noexcept -> task_t<string> {
auto fetch_task{f$component_client_get_result(std::move(comp_query))};
const auto response{(co_await wait_with_timeout_t{task_t<string>::awaiter_t{std::addressof(fetch_task)}, timeout}).value_or(string{})};
// update response extra info if needed
Expand All @@ -149,7 +149,7 @@ task_t<RpcQueryInfo> rpc_send_impl(string actor, double timeout, bool ignore_ans
co_return response;
}(query_id, std::move(comp_query), timeout_ns, collect_responses_extra_info)};
// start waiter fork
const auto waiter_fork_id{co_await start_fork_t{std::move(waiter_task), start_fork_t::execution::self}};
const auto waiter_fork_id{co_await start_fork_t{static_cast<task_t<void>>(std::move(waiter_task)), start_fork_t::execution::self}};

if (ignore_answer) {
co_return RpcQueryInfo{.id = RPC_IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
Expand Down
4 changes: 2 additions & 2 deletions runtime-light/stdlib/string-functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ void f$debug_print_string(const string &s) {
Optional<int64_t> f$byte_to_int(const string &s) {
if (s.size() != 1) {
php_warning("Cannot convert non-byte string to int");
return Optional<int64_t>();
return {};
}
return *s.c_str();
}

Optional<string> f$int_to_byte(int64_t v) {
if (v > 127 || v < -128) {
php_warning("Cannot convert too big int to byte %ld", v);
php_warning("Cannot convert too big int to byte %" PRId64, v);
return false;
}
char c = v;
Expand Down
1 change: 0 additions & 1 deletion runtime-light/stdlib/string-functions.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "runtime-core/runtime-core.h"
#include <type_traits>

void print(const char *s, size_t s_len);

Expand Down
3 changes: 1 addition & 2 deletions runtime-light/stdlib/timer/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ task_t<void> f$set_timer(int64_t timeout_ms, T &&on_timer_callback) noexcept {
php_warning("can't set timer for negative duration %" PRId64 "ms", timeout_ms);
co_return;
}
const auto fork_f{[](std::chrono::nanoseconds duration, T &&on_timer_callback) -> task_t<fork_result> {
const auto fork_f{[](std::chrono::nanoseconds duration, T &&on_timer_callback) -> task_t<void> {
co_await wait_for_timer_t{duration};
on_timer_callback();
co_return 0;
}}; // TODO: someone should pop that fork from ForkComponentContext since it will stay there unless we perform f$wait on fork
const auto duration_ms{std::chrono::milliseconds{static_cast<uint64_t>(timeout_ms)}};
co_await start_fork_t{fork_f(std::chrono::duration_cast<std::chrono::nanoseconds>(duration_ms), std::forward<T>(on_timer_callback)),
Expand Down
7 changes: 3 additions & 4 deletions runtime-light/utils/panic.h → runtime-light/utils/panic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <csetjmp>

#include "context.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/header.h"
#include "runtime-light/utils/context.h"
#include "runtime-light/utils/logs.h"

inline void critical_error_handler() {
void critical_error_handler() {
constexpr const char *message = "script panic";
const auto &platform_ctx = *get_platform_context();
auto &component_ctx = *get_component_context();
Expand Down
8 changes: 4 additions & 4 deletions runtime-light/utils/php_assert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
// Copyright (c) 2020 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/utils/php_assert.h"

#include <algorithm>
#include <csignal>
#include <cstdarg>
#include <cstdio>
Expand All @@ -16,7 +13,10 @@
#include <sys/wait.h>
#include <unistd.h>

#include "runtime-light/utils/panic.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/header.h"
#include "runtime-light/utils/context.h"
#include "runtime-light/utils/logs.h"

static void php_warning_impl(bool out_of_memory, int error_type, char const *message, va_list args) {
(void)out_of_memory;
Expand Down
Loading

0 comments on commit bd7874b

Please sign in to comment.