Skip to content

Commit

Permalink
Rework K2 stream API (#1077)
Browse files Browse the repository at this point in the history
  • Loading branch information
apolyakov authored Aug 27, 2024
1 parent 95207c8 commit a6b6d89
Show file tree
Hide file tree
Showing 42 changed files with 556 additions and 846 deletions.
43 changes: 9 additions & 34 deletions builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ function strval ($v ::: mixed) ::: string;
/** @kphp-extern-func-info interruptible */
function exit($code = 0) ::: void;

/** @kphp-extern-func-info interruptible */
function die($code = 0) ::: void;

function ob_clean() ::: void;
Expand Down Expand Up @@ -76,13 +77,15 @@ function get_hash_of_class (object $klass) ::: int;

function strlen ($str ::: string) ::: int;

// === Fork =======================================================================================
// === Future =====================================================================================

function get_running_fork_id() ::: future <void>;

/** @kphp-extern-func-info interruptible cpp_template_call */
function wait(future<any> | false $id, float $timeout = -1.0) ::: ^1[*] | null;

// === Fork =======================================================================================

/** @kphp-extern-func-info interruptible */
function sched_yield() ::: void;

Expand Down Expand Up @@ -141,42 +144,19 @@ class ComponentQuery {
}

/** @kphp-extern-func-info interruptible */
function component_get_http_query() ::: void;
function component_client_send_request($name ::: string, $message ::: string) ::: ComponentQuery;

/** @kphp-extern-func-info interruptible */
function component_client_send_query($name ::: string, $message ::: string) ::: ComponentQuery;
/** @kphp-extern-func-info interruptible */
function component_client_get_result($query ::: ComponentQuery) ::: string;
function component_client_fetch_response($query ::: ComponentQuery) ::: string;

/** @kphp-extern-func-info interruptible */
function component_server_get_query() ::: string;
/** @kphp-extern-func-info interruptible */
function component_server_send_result($message ::: string) ::: void;

class ComponentStream {
private function __construct() ::: \ComponentStream;
function component_server_accept_query() ::: ComponentQuery;

public function is_read_closed() ::: bool;
public function is_write_closed() ::: bool;
public function is_please_shutdown_write() ::: bool;

public function shutdown_write() ::: void;
public function please_shutdown_write() ::: void;
}

function component_open_stream($name ::: string) ::: ComponentStream;
/** @kphp-extern-func-info interruptible */
function component_accept_stream() ::: ComponentStream;
function component_server_fetch_request($query ::: ComponentQuery) ::: string;

function component_stream_write_nonblock($stream ::: ComponentStream, $message ::: string) ::: int;
function component_stream_read_nonblock($stream ::: ComponentStream) ::: string;
/** @kphp-extern-func-info interruptible */
function component_stream_write_exact($stream ::: ComponentStream, $message ::: string) ::: int;
/** @kphp-extern-func-info interruptible */
function component_stream_read_exact($stream ::: ComponentStream, $len ::: int) ::: string;

function component_close_stream($stream ::: ComponentStream) ::: void;
function component_finish_stream_processing($stream ::: ComponentStream) ::: void;
function component_server_send_response($query ::: ComponentQuery, $message ::: string) ::: void;

// === Json =======================================================================================

Expand Down Expand Up @@ -217,11 +197,6 @@ function warning($message ::: string) ::: void;
/** @kphp-no-return */
function critical_error($message ::: string) ::: void;

function debug_print_string($str ::: string) ::: void;

function byte_to_int($str ::: string) ::: ?int;
function int_to_byte($v ::: int) ::: ?string;

/** @kphp-extern-func-info interruptible */
function set_timer(int $timeout, callable():void $callback) ::: void;

Expand Down
4 changes: 2 additions & 2 deletions compiler/code-gen/files/init-scripts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ struct RunInterruptedFunction {
* 1) Start when the request came in
* 2) Collecting output buffer after script finished
**/
std::string script_start = G->settings().k2_component_is_oneshot.get() ? "co_await f$component_get_http_query();" : "";
std::string script_finish = G->settings().k2_component_is_oneshot.get() ? "co_await finish(0, false);" : "";
std::string script_start = G->settings().k2_component_is_oneshot.get() ? "co_await accept_initial_stream();" : "";
std::string script_finish = G->settings().k2_component_is_oneshot.get() ? "co_await shutdown_script();" : "";
FunctionSignatureGenerator(W) << "task_t<void> " << FunctionName(function) << "$run() " << BEGIN
<< script_start << NL
<< await_prefix << FunctionName(function) << "();" << NL
Expand Down
2 changes: 1 addition & 1 deletion runtime-light/component/component.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "runtime-light/header.h"
#include "runtime-light/scheduler/scheduler.h"
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/output-control.h"
#include "runtime-light/stdlib/output/output-buffer.h"
#include "runtime-light/stdlib/rpc/rpc-context.h"

constexpr uint64_t INVALID_PLATFORM_DESCRIPTOR = 0;
Expand Down
74 changes: 74 additions & 0 deletions runtime-light/stdlib/component/component-api.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/stdlib/component/component-api.h"

#include <cstdint>

#include "runtime-core/runtime-core.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/streams/streams.h"
#include "runtime-light/utils/context.h"

// === component query client interface ===========================================================

task_t<class_instance<C$ComponentQuery>> f$component_client_send_request(string name, string message) noexcept {
const auto stream_d{get_component_context()->open_stream(name)};
if (stream_d == INVALID_PLATFORM_DESCRIPTOR) {
co_return class_instance<C$ComponentQuery>{};
}

int32_t written{co_await write_all_to_stream(stream_d, message.c_str(), message.size())};
if (written != message.size()) {
php_warning("can't send request to component '%s'", name.c_str());
co_return class_instance<C$ComponentQuery>{};
}

get_platform_context()->shutdown_write(stream_d);
php_debug("sent %d bytes from %d to '%s' on stream %" PRIu64, written, message.size(), name.c_str(), stream_d);
co_return make_instance<C$ComponentQuery>(stream_d);
}

task_t<string> f$component_client_fetch_response(class_instance<C$ComponentQuery> query) noexcept {
uint64_t stream_d{query.is_null() ? INVALID_PLATFORM_DESCRIPTOR : query.get()->stream_d};
if (stream_d == INVALID_PLATFORM_DESCRIPTOR) {
php_warning("can't fetch component response from stream %" PRIu64, stream_d);
co_return string{};
}

const auto [buffer, size]{co_await read_all_from_stream(stream_d)};
string result{buffer, static_cast<string::size_type>(size)};
get_platform_context()->allocator.free(buffer);
php_debug("read %d bytes from stream %" PRIu64, size, stream_d);
get_component_context()->release_stream(stream_d);
query.get()->stream_d = INVALID_PLATFORM_DESCRIPTOR;
co_return result;
}

// === component query server interface ===========================================================

task_t<class_instance<C$ComponentQuery>> f$component_server_accept_query() noexcept {
co_return make_instance<C$ComponentQuery>(co_await wait_for_incoming_stream_t{});
}

task_t<string> f$component_server_fetch_request(class_instance<C$ComponentQuery> query) noexcept {
uint64_t stream_d{query.is_null() ? INVALID_PLATFORM_DESCRIPTOR : query.get()->stream_d};
const auto [buffer, size]{co_await read_all_from_stream(stream_d)};
string result{buffer, static_cast<string::size_type>(size)};
get_platform_context()->allocator.free(buffer);
co_return result;
}

task_t<void> f$component_server_send_response(class_instance<C$ComponentQuery> query, string message) noexcept {
uint64_t stream_d{query.is_null() ? INVALID_PLATFORM_DESCRIPTOR : query.get()->stream_d};
if ((co_await write_all_to_stream(stream_d, message.c_str(), message.size())) != message.size()) {
php_warning("can't send component response to stream %" PRIu64, stream_d);
} else {
php_debug("sent %d bytes as response to stream %" PRIu64, message.size(), stream_d);
}
get_component_context()->release_stream(stream_d);
}
59 changes: 59 additions & 0 deletions runtime-light/stdlib/component/component-api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <cstdint>
#include <string_view>
#include <utility>

#include "runtime-core/class-instance/refcountable-php-classes.h"
#include "runtime-core/runtime-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/utils/context.h"

// === ComponentQuery =============================================================================

struct C$ComponentQuery final : public refcountable_php_classes<C$ComponentQuery> {
uint64_t stream_d{INVALID_PLATFORM_DESCRIPTOR};

explicit constexpr C$ComponentQuery(uint64_t stream_d_) noexcept
: stream_d(stream_d_) {}
constexpr C$ComponentQuery(C$ComponentQuery &&other) noexcept
: stream_d(std::exchange(other.stream_d, INVALID_PLATFORM_DESCRIPTOR)) {};

C$ComponentQuery(const C$ComponentQuery &) = delete;
C$ComponentQuery &operator=(const C$ComponentQuery &) = delete;
C$ComponentQuery &operator=(C$ComponentQuery &&other) = delete;

constexpr const char *get_class() const noexcept {
return "ComponentQuery";
}

constexpr int32_t get_hash() const noexcept {
return static_cast<int32_t>(std::hash<std::string_view>{}(get_class()));
}

~C$ComponentQuery() {
auto &component_ctx{*get_component_context()};
if (component_ctx.opened_streams().contains(stream_d)) {
component_ctx.release_stream(stream_d);
}
}
};

// === component query client interface ===========================================================

task_t<class_instance<C$ComponentQuery>> f$component_client_send_request(string name, string message) noexcept;

task_t<string> f$component_client_fetch_response(class_instance<C$ComponentQuery> query) noexcept;

// === component query server interface ===========================================================

task_t<class_instance<C$ComponentQuery>> f$component_server_accept_query() noexcept;

task_t<string> f$component_server_fetch_request(class_instance<C$ComponentQuery> query) noexcept;

task_t<void> f$component_server_send_response(class_instance<C$ComponentQuery> query, string message) noexcept;
65 changes: 65 additions & 0 deletions runtime-light/stdlib/exit/exit-functions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/stdlib/exit/exit-functions.h"

#include <cstdint>

#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/header.h"
#include "runtime-light/streams/streams.h"
#include "runtime-light/utils/context.h"

namespace {

int32_t ob_merge_buffers() noexcept {
Response &response{get_component_context()->response};
php_assert(response.current_buffer >= 0);

int32_t ob_first_not_empty{};
while (ob_first_not_empty < response.current_buffer && response.output_buffers[ob_first_not_empty].size() == 0) {
++ob_first_not_empty;
}
for (auto i = ob_first_not_empty + 1; i <= response.current_buffer; i++) {
response.output_buffers[ob_first_not_empty].append(response.output_buffers[i].c_str(), response.output_buffers[i].size());
}
return ob_first_not_empty;
}

} // namespace

task_t<void> shutdown_script() noexcept {
auto &component_ctx{*get_component_context()};
const auto standard_stream{component_ctx.standard_stream()};
if (standard_stream == INVALID_PLATFORM_DESCRIPTOR) {
component_ctx.poll_status = PollStatus::PollFinishedError;
co_return;
}

const auto &buffer{component_ctx.response.output_buffers[ob_merge_buffers()]};
if ((co_await write_all_to_stream(standard_stream, buffer.buffer(), buffer.size())) != buffer.size()) {
php_warning("can't write component result to stream %" PRIu64, standard_stream);
}
}

task_t<void> f$exit(const mixed &v) noexcept { // TODO: make it synchronous
int64_t exit_code{};
if (v.is_string()) {
Response &response{get_component_context()->response};
response.output_buffers[response.current_buffer] << v;
} else if (v.is_int()) {
int64_t v_code{v.to_int()};
// valid PHP exit codes: [0..254]
exit_code = v_code >= 0 && v_code <= 254 ? v_code : 1;
} else {
exit_code = 1;
}
co_await shutdown_script();
auto &component_ctx{*get_component_context()};
component_ctx.poll_status =
component_ctx.poll_status != PollStatus::PollFinishedError && exit_code == 0 ? PollStatus::PollFinishedOk : PollStatus::PollFinishedError;
component_ctx.release_all_streams();
get_platform_context()->abort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
#include "runtime-core/runtime-core.h"
#include "runtime-light/coroutine/task.h"

int64_t f$rand();
task_t<void> shutdown_script() noexcept;

template<class T>
T f$make_clone(const T &x) {
return x;
task_t<void> f$exit(const mixed &v = 0) noexcept;

inline task_t<void> f$die(const mixed &v = 0) noexcept {
co_await f$exit(v);
}
23 changes: 0 additions & 23 deletions runtime-light/stdlib/fork/fork-api.cpp

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast<std::chrono::nano

} // namespace fork_api_impl_


// === Blocking API ================================================================================

template<typename T>
Expand All @@ -47,12 +46,20 @@ requires(is_optional<T>::value || std::same_as<T, mixed>) task_t<T> f$wait(Optio
co_return co_await f$wait<T>(fork_id_opt.has_value() ? fork_id_opt.val() : INVALID_FORK_ID, timeout);
}

inline task_t<void> f$sched_yield() noexcept {
co_await wait_for_reschedule_t{};
}

inline task_t<void> f$sched_yield_sleep(double duration) noexcept {
if (duration <= 0) {
php_warning("can't sleep for negative or zero duration %.9f", duration);
co_return;
}
co_await wait_for_timer_t{std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{duration})};
}

// === Non-blocking API ============================================================================

inline int64_t f$get_running_fork_id() noexcept {
return ForkComponentContext::get().running_fork_id;
}

task_t<void> f$sched_yield() noexcept;

task_t<void> f$sched_yield_sleep(double duration) noexcept;
19 changes: 0 additions & 19 deletions runtime-light/stdlib/interface.cpp

This file was deleted.

Loading

0 comments on commit a6b6d89

Please sign in to comment.