Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement wait queue #1061

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open

implement wait queue #1061

wants to merge 10 commits into from

Conversation

astrophysik
Copy link
Contributor

@astrophysik astrophysik commented Aug 9, 2024

General

add buildin functions from runtime realted to wait fork queue

function wait_queue_create (array<future<any> | false> $request_ids = []) ::: future_queue<^1[*][*]>;
function wait_queue_push (future_queue<any> &$queue_id, future<any> | false $request_ids) ::: void;
function wait_queue_empty (future_queue<any> $queue_id) ::: bool;
/** @kphp-extern-func-info interruptible */
function wait_queue_next (future_queue<any> $queue_id, $timeout ::: float = -1.0) ::: future<^1[*]> | false;

The storage for waiting queues is WaitQueueContext. That class is responsible for queues managing. Class wait_queue_t contains two queues: one for forks, second for awaiters.

Details

@astrophysik astrophysik self-assigned this Aug 9, 2024
@astrophysik astrophysik added k2 k2 related runtime Feature related to runtime labels Aug 9, 2024
@DrDet DrDet requested a review from apolyakov August 9, 2024 13:57
Copy link
Contributor

@apolyakov apolyakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think that PR is based on wrong assumptions. It doesn't actually use cancellation, it's not ready for extensions in the future, e.g. job workers, RPC futures etc.

@@ -46,7 +46,7 @@ class resource_allocator {
}

static constexpr size_t max_value_type_size() {
return 128U;
return 256U;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it changed?

return true;
}

void await_suspend(std::coroutine_handle<> coro) noexcept {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is something wrong with the design: wait_queue_next_t always waits for timeout before trying to check fork readiness. So, for example, wait_queue_next($id, 1sec) is going to wait 1 second anyway, even if next fork is ready in 10ms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove class wait_queue_next_t

if (ready_fork.has_value()) {
timer_awaiter.cancel();
// todo set here info about prev fork_id
return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future ID should not be changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix


// ================================================================================================

class wait_queue_next_t {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it work with job workers? Job workers are also represented as futures, so we can wait on them as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it, check wait_queue_t

#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/concepts.h"

class WaitQueue {
Copy link
Contributor

@apolyakov apolyakov Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks quite strange. We should not work with raw coroutine handles unless we develop an awaitable. In my opinion, WaitQueue can be completely removed. WaitQueueContext can be simplified a lot. Also, keep in mind that wait queues can be used to wait for job workers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge wait_next_t and WaitQueue

@@ -14,6 +14,7 @@

int64_t f$rand() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it defined in that file?

@astrophysik astrophysik force-pushed the vsadokhov/k2-wait-queue branch 4 times, most recently from c264013 to d74f418 Compare August 14, 2024 14:12
}
std::coroutine_handle<> next_awaiter = awaiters.empty() ? std::noop_coroutine() : awaiters.front().awaited_handle;
std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) {
task_t<fork_result>::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this part. I think should be method in ForkComponentContext for suspend coroutine handle on task_t

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
k2 k2 related runtime Feature related to runtime
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants