diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index 2d5f04f256..17d4e7cad4 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -95,8 +95,8 @@ class wait_for_update_t : public awaitable_impl_::fork_id_watcher_t { } constexpr void await_resume() noexcept { - fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; + fork_id_watcher_t::await_resume(); } bool resumable() const noexcept { @@ -145,8 +145,8 @@ class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t { } uint64_t await_resume() noexcept { - fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; + fork_id_watcher_t::await_resume(); const auto incoming_stream_d{get_component_context()->take_incoming_stream()}; php_assert(incoming_stream_d != INVALID_PLATFORM_DESCRIPTOR); return incoming_stream_d; @@ -197,8 +197,8 @@ class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t { } constexpr void await_resume() noexcept { - fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; + fork_id_watcher_t::await_resume(); } bool resumable() const noexcept { @@ -257,8 +257,8 @@ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t { } constexpr void await_resume() noexcept { - fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; + fork_id_watcher_t::await_resume(); } bool resumable() const noexcept { @@ -287,6 +287,7 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t { std::coroutine_handle<> fork_coro; int64_t fork_id{}; SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}}; + awaitable_impl_::State state{awaitable_impl_::State::Init}; public: explicit start_fork_t(task_t task_, execution exec_policy_) noexcept @@ -298,7 +299,8 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t { : exec_policy(other.exec_policy) , fork_coro(std::exchange(other.fork_coro, std::noop_coroutine())) , fork_id(std::exchange(other.fork_id, INVALID_FORK_ID)) - , suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) {} + , suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) + , state(std::exchange(other.state, awaitable_impl_::State::End)) {} start_fork_t(const start_fork_t &) = delete; start_fork_t &operator=(const start_fork_t &) = delete; @@ -306,10 +308,12 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t { ~start_fork_t() = default; constexpr bool await_ready() const noexcept { + php_assert(state == awaitable_impl_::State::Init); return false; } std::coroutine_handle<> await_suspend(std::coroutine_handle<> current_coro) noexcept { + state = awaitable_impl_::State::Suspend; std::coroutine_handle<> continuation{}; switch (exec_policy) { case execution::fork: { @@ -331,7 +335,8 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t { return continuation; } - int64_t await_resume() const noexcept { + int64_t await_resume() noexcept { + state = awaitable_impl_::State::End; fork_id_watcher_t::await_resume(); return fork_id; } @@ -344,6 +349,7 @@ class wait_fork_t : awaitable_impl_::fork_id_watcher_t { int64_t fork_id; task_t fork_task; task_t::awaiter_t fork_awaiter; + awaitable_impl_::State state{awaitable_impl_::State::Init}; using fork_resume_t = decltype(fork_awaiter.await_resume()); using await_resume_t = fork_resume_t; @@ -357,22 +363,32 @@ class wait_fork_t : awaitable_impl_::fork_id_watcher_t { wait_fork_t(wait_fork_t &&other) noexcept : fork_id(std::exchange(other.fork_id, INVALID_FORK_ID)) , fork_task(std::move(other.fork_task)) - , fork_awaiter(std::addressof(fork_task)) {} + , fork_awaiter(std::addressof(fork_task)) + , state(std::exchange(other.state, awaitable_impl_::State::End)) {} wait_fork_t(const wait_fork_t &) = delete; wait_fork_t &operator=(const wait_fork_t &) = delete; wait_fork_t &operator=(wait_fork_t &&) = delete; - ~wait_fork_t() = default; - bool await_ready() const noexcept { - return fork_awaiter.resumable(); + ~wait_fork_t() { + if (state == awaitable_impl_::State::Suspend) { + cancel(); + } + } + + bool await_ready() noexcept { + php_assert(state == awaitable_impl_::State::Init); + state = fork_awaiter.resumable() ? awaitable_impl_::State::Ready : awaitable_impl_::State::Init; + return state == awaitable_impl_::State::Ready; } constexpr void await_suspend(std::coroutine_handle<> coro) noexcept { + state = awaitable_impl_::State::Suspend; fork_awaiter.await_suspend(coro); } await_resume_t await_resume() noexcept { + state = awaitable_impl_::State::End; fork_id_watcher_t::await_resume(); if constexpr (std::is_void_v) { fork_awaiter.await_resume(); @@ -382,10 +398,11 @@ class wait_fork_t : awaitable_impl_::fork_id_watcher_t { } constexpr bool resumable() const noexcept { - return fork_awaiter.resumable(); + return state == awaitable_impl_::State::Ready || (state == awaitable_impl_::State::Suspend && fork_awaiter.resumable()); } - constexpr void cancel() const noexcept { + constexpr void cancel() noexcept { + state = awaitable_impl_::State::End; fork_awaiter.cancel(); } }; @@ -396,6 +413,7 @@ template class wait_with_timeout_t { T awaitable; wait_for_timer_t timer_awaitable; + awaitable_impl_::State state{awaitable_impl_::State::Init}; static_assert(CancellableAwaitable); static_assert(std::is_void_v{}))>); @@ -411,15 +429,18 @@ class wait_with_timeout_t { wait_with_timeout_t(wait_with_timeout_t &&other) noexcept : awaitable(std::move(other.awaitable)) - , timer_awaitable(std::move(other.timer_awaitable)) {} + , timer_awaitable(std::move(other.timer_awaitable)) + , state(std::exchange(other.state, awaitable_impl_::State::End)) {} wait_with_timeout_t(const wait_with_timeout_t &) = delete; wait_with_timeout_t &operator=(const wait_with_timeout_t &) = delete; wait_with_timeout_t &operator=(wait_with_timeout_t &&) = delete; ~wait_with_timeout_t() = default; - constexpr bool await_ready() const noexcept { - return awaitable.await_ready() || timer_awaitable.await_ready(); + constexpr bool await_ready() noexcept { + php_assert(state == awaitable_impl_::State::Init); + state = awaitable.await_ready() || timer_awaitable.await_ready() ? awaitable_impl_::State::Ready : awaitable_impl_::State::Init; + return state == awaitable_impl_::State::Ready; } // according to C++ standard, there can be 3 possible cases: @@ -431,6 +452,7 @@ class wait_with_timeout_t { await_suspend_return_t await_suspend(std::coroutine_handle<> coro) noexcept { // as we don't rely on coroutine scheduler implementation, let's always suspend awaitable first. in case of some smart scheduler // it won't have any effect, but it will have an effect if our scheduler is quite simple. + state = awaitable_impl_::State::Suspend; if constexpr (std::is_void_v) { awaitable.await_suspend(coro); timer_awaitable.await_suspend(coro); @@ -442,6 +464,7 @@ class wait_with_timeout_t { } await_resume_return_t await_resume() noexcept { + state = awaitable_impl_::State::End; if (awaitable.resumable()) { timer_awaitable.cancel(); if constexpr (!std::is_void_v) {