Skip to content

Commit

Permalink
K2: add state to start_fork_t, wait_fork_t, and wait_with_timeout_t (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
apolyakov authored Oct 14, 2024
1 parent e4e470f commit 434378e
Showing 1 changed file with 38 additions and 15 deletions.
53 changes: 38 additions & 15 deletions runtime-light/coroutine/awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class wait_for_update_t : public awaitable_impl_::fork_id_watcher_t {
}

constexpr void await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
fork_id_watcher_t::await_resume();
}

bool resumable() const noexcept {
Expand Down Expand Up @@ -145,8 +145,8 @@ class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t {
}

uint64_t await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
fork_id_watcher_t::await_resume();
const auto incoming_stream_d{get_component_context()->take_incoming_stream()};
php_assert(incoming_stream_d != INVALID_PLATFORM_DESCRIPTOR);
return incoming_stream_d;
Expand Down Expand Up @@ -197,8 +197,8 @@ class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t {
}

constexpr void await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
fork_id_watcher_t::await_resume();
}

bool resumable() const noexcept {
Expand Down Expand Up @@ -257,8 +257,8 @@ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {
}

constexpr void await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
fork_id_watcher_t::await_resume();
}

bool resumable() const noexcept {
Expand Down Expand Up @@ -287,6 +287,7 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t {
std::coroutine_handle<> fork_coro;
int64_t fork_id{};
SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}};
awaitable_impl_::State state{awaitable_impl_::State::Init};

public:
explicit start_fork_t(task_t<void> task_, execution exec_policy_) noexcept
Expand All @@ -298,18 +299,21 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t {
: exec_policy(other.exec_policy)
, fork_coro(std::exchange(other.fork_coro, std::noop_coroutine()))
, fork_id(std::exchange(other.fork_id, INVALID_FORK_ID))
, suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) {}
, suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{})))
, state(std::exchange(other.state, awaitable_impl_::State::End)) {}

start_fork_t(const start_fork_t &) = delete;
start_fork_t &operator=(const start_fork_t &) = delete;
start_fork_t &operator=(start_fork_t &&) = delete;
~start_fork_t() = default;

constexpr bool await_ready() const noexcept {
php_assert(state == awaitable_impl_::State::Init);
return false;
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<> current_coro) noexcept {
state = awaitable_impl_::State::Suspend;
std::coroutine_handle<> continuation{};
switch (exec_policy) {
case execution::fork: {
Expand All @@ -331,7 +335,8 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t {
return continuation;
}

int64_t await_resume() const noexcept {
int64_t await_resume() noexcept {
state = awaitable_impl_::State::End;
fork_id_watcher_t::await_resume();
return fork_id;
}
Expand All @@ -344,6 +349,7 @@ class wait_fork_t : awaitable_impl_::fork_id_watcher_t {
int64_t fork_id;
task_t<T> fork_task;
task_t<T>::awaiter_t fork_awaiter;
awaitable_impl_::State state{awaitable_impl_::State::Init};

using fork_resume_t = decltype(fork_awaiter.await_resume());
using await_resume_t = fork_resume_t;
Expand All @@ -357,22 +363,32 @@ class wait_fork_t : awaitable_impl_::fork_id_watcher_t {
wait_fork_t(wait_fork_t &&other) noexcept
: fork_id(std::exchange(other.fork_id, INVALID_FORK_ID))
, fork_task(std::move(other.fork_task))
, fork_awaiter(std::addressof(fork_task)) {}
, fork_awaiter(std::addressof(fork_task))
, state(std::exchange(other.state, awaitable_impl_::State::End)) {}

wait_fork_t(const wait_fork_t &) = delete;
wait_fork_t &operator=(const wait_fork_t &) = delete;
wait_fork_t &operator=(wait_fork_t &&) = delete;
~wait_fork_t() = default;

bool await_ready() const noexcept {
return fork_awaiter.resumable();
~wait_fork_t() {
if (state == awaitable_impl_::State::Suspend) {
cancel();
}
}

bool await_ready() noexcept {
php_assert(state == awaitable_impl_::State::Init);
state = fork_awaiter.resumable() ? awaitable_impl_::State::Ready : awaitable_impl_::State::Init;
return state == awaitable_impl_::State::Ready;
}

constexpr void await_suspend(std::coroutine_handle<> coro) noexcept {
state = awaitable_impl_::State::Suspend;
fork_awaiter.await_suspend(coro);
}

await_resume_t await_resume() noexcept {
state = awaitable_impl_::State::End;
fork_id_watcher_t::await_resume();
if constexpr (std::is_void_v<await_resume_t>) {
fork_awaiter.await_resume();
Expand All @@ -382,10 +398,11 @@ class wait_fork_t : awaitable_impl_::fork_id_watcher_t {
}

constexpr bool resumable() const noexcept {
return fork_awaiter.resumable();
return state == awaitable_impl_::State::Ready || (state == awaitable_impl_::State::Suspend && fork_awaiter.resumable());
}

constexpr void cancel() const noexcept {
constexpr void cancel() noexcept {
state = awaitable_impl_::State::End;
fork_awaiter.cancel();
}
};
Expand All @@ -396,6 +413,7 @@ template<CancellableAwaitable T>
class wait_with_timeout_t {
T awaitable;
wait_for_timer_t timer_awaitable;
awaitable_impl_::State state{awaitable_impl_::State::Init};

static_assert(CancellableAwaitable<wait_for_timer_t>);
static_assert(std::is_void_v<decltype(timer_awaitable.await_suspend(std::coroutine_handle<>{}))>);
Expand All @@ -411,15 +429,18 @@ class wait_with_timeout_t {

wait_with_timeout_t(wait_with_timeout_t &&other) noexcept
: awaitable(std::move(other.awaitable))
, timer_awaitable(std::move(other.timer_awaitable)) {}
, timer_awaitable(std::move(other.timer_awaitable))
, state(std::exchange(other.state, awaitable_impl_::State::End)) {}

wait_with_timeout_t(const wait_with_timeout_t &) = delete;
wait_with_timeout_t &operator=(const wait_with_timeout_t &) = delete;
wait_with_timeout_t &operator=(wait_with_timeout_t &&) = delete;
~wait_with_timeout_t() = default;

constexpr bool await_ready() const noexcept {
return awaitable.await_ready() || timer_awaitable.await_ready();
constexpr bool await_ready() noexcept {
php_assert(state == awaitable_impl_::State::Init);
state = awaitable.await_ready() || timer_awaitable.await_ready() ? awaitable_impl_::State::Ready : awaitable_impl_::State::Init;
return state == awaitable_impl_::State::Ready;
}

// according to C++ standard, there can be 3 possible cases:
Expand All @@ -431,6 +452,7 @@ class wait_with_timeout_t {
await_suspend_return_t await_suspend(std::coroutine_handle<> coro) noexcept {
// as we don't rely on coroutine scheduler implementation, let's always suspend awaitable first. in case of some smart scheduler
// it won't have any effect, but it will have an effect if our scheduler is quite simple.
state = awaitable_impl_::State::Suspend;
if constexpr (std::is_void_v<await_suspend_return_t>) {
awaitable.await_suspend(coro);
timer_awaitable.await_suspend(coro);
Expand All @@ -442,6 +464,7 @@ class wait_with_timeout_t {
}

await_resume_return_t await_resume() noexcept {
state = awaitable_impl_::State::End;
if (awaitable.resumable()) {
timer_awaitable.cancel();
if constexpr (!std::is_void_v<await_resume_return_t>) {
Expand Down

0 comments on commit 434378e

Please sign in to comment.