Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellation support #402

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

poor-circle
Copy link
Contributor

@poor-circle poor-circle commented Nov 15, 2024

Why

CancellationSignal

A CancellationSignal can be shared among multiple coroutines, and when a coroutine is started, it can be bound to the signal. The lifetime of the signal will be extended until the registered coroutine has finished running. A signal can only be triggered once.

Lazy<void> work();

std::shared_ptr<CancellationSignal> signal = CancellationSignal::create();
work().via(ex).setCancellation(signal.get()).detach();
work().via(ex).setCancellation(signal.get()).detach();
work().via(ex).setCancellation(signal.get()).detach();
signal.emit(CancellationType::terminal);

CancellationSignal has such interface:

enum class CancellationType : uint64_t {
    none = 0,
                                                             // low bits reserve for other cancellation type
    terminal = 0x4000'0000'0000'0000, // CancellationType:terminal 
    all = 0x7fff'ffff'ffff'ffff, 
};
class CancellationSignal
    : public std::enable_shared_from_this<CancellationSignal> {
public:
    //  Thread-safe. Emit a signal, return false when emit twice.
    bool emit(CancellationType state) noexcept;
    //  Thread-safe. Get cancellation type of signal,
    CancellationType state() const noexcept;
    //  create signal.
    static std::shared_ptr<CancellationSignal> create();
};

CancellationSlot

If a coroutine registers a cancellation signal when it starts, it can obtain a CancellationSlot for response. When the coroutine is called through co_await, the CancellationSlot will be automatically propagated. The lifetime of the CancellationSlot will be extended until the coroutine ends.

The propagation of the signal is as follows:

foo().via(ex).cancelWith(signal).start([](auto&&){});

Lazy<void> bar(int i);

Lazy<void> foo() {
    auto slot = co_await currentCancelSlot{};
    // no propagation
    bar(0).start([](auto&&){})
    // manually auto propagation
    bar(1).via(ex).cancelWith(slot->signal()).start([](auto&&){});  
    co_await bar(2); // auto propagation
}

In the above task: If the signal is triggered, then foo(), bar(1) and bar(2) will receive the signal, while bar(0) will not receive the signal.

Users can obtain the current cancellation slot of a coroutine through co_await currentCancellationSlot{}; if the coroutine is not bound to a signal, it will return nullptr. The canceled() method can be called to check if the current coroutine has been canceled. A signal handling function can be registered using the emplace() method.

The signal handling function will be executed by the thread that initiates the cancel signal, and users need to ensure the thread safety of this function themselves. We guarantee that the signal handling function will be triggered at most once.

Lazy<bool> sleep_1s() {
    CancellationSlot* slot = co_await currentCancellationSlot{};
    auto p = std::make_unique<std::promise<void>>();
    auto waiter = p.get_future();
    if (slot) {
        if (!slot->emplace([p=std::move(p)](CancellationType type){
            p.notity();
        })) {
            // signal has triggered
            co_return false;
        }
    }
    if (waiter.wait_for(1s)==std::future_status::timeout) {
        co_return true;
    }
}
Lazy<void> yield_until_canceled() {
    CancellationSlot* slot = co_await currentCancellationSlot{};
    while(true) {
        if (slot && slot->canceled()) {
            co_return;
        }
        co_await Yield{};
    }
}

The public interface of CancellationSlot is as follows. Unlike signals that are shared across multiple coroutines in a thread-safe manner, different coroutines will hold different slots. Therefore, we prohibit concurrent calls to the same slot, as it is not thread-safe.

class CancellationSlot {
    using CancellationHandler = util::move_only_function<void(CancellationType)>;
    // Register a signal handler. Returns false if the cancellation signal has already been triggered.
    template <typename... Args>
    [[nodiscard]] bool emplace(Args&&... args);
    // Clear the signal handler. If a null pointer is returned, it indicates that the signal handler has been executed or that no signal has been registered yet.
    void* clear();
    // Filter signals within the specified scope. If signal type & filter is 0, then the signal type will not be triggered within this scope.
    // Nested filters are allowed.
    [[nodiscard]] FilterGuard addScopedFilter(CancellationType filter);
    // Get the current scope's filter.
    CancellationType getFilter();
    // Get the current cancellation signal status.
    CancellationType state() const noexcept;
    // Check whether the filtered cancellation signal is in a triggered state.
    bool canceled() const noexcept;
    // The slot holds ownership of the corresponding signal, so the signal's lifetime is always longer than the slot's.
    // To extend the signal's lifetime, you can call signal()->shared_from_this(),
    // or start a new coroutine with the signal.
    CancellationSignal* signal() const noexcept;
    // This slot will ignore all subsequent cancellation signals.
    void forbidCancellation() noexcept;
};

Collect and structured concurrency

The signal and slot mechanism belongs to a low-level API and is generally used for users to implement their own asynchronous components and very complex asynchronous logic. In general code, we recommend using collectAnyand collectAll to implement structured concurrency and cancellation logic. For example, when handling timeouts in the code, we only need to do it like this:

auto result = co_await collectAny(work(), sleep(1s));
if (result.index() == 1) {
    // timed out
}

The collectAny function will internally create a new signal and bind the coroutines in the parameter list to that signal. When the first task finishes, the signal will be triggered.

We can set the cancellation signal type for collectAny through template parameters.

// work wont be canceled
co_await collectAny<CancellationType::none>(work(), sleep(1s));
// work will be canceled by signal CancellationType::terminal
co_await collectAny<CancellationType::terminal>(work(), sleep(1s));

If the coroutine bound to collectAny is tied to a cancellation signal, then when the signal is triggered, collectAny will immediately terminate and throw an exception of type std::system_error (error code: std::errc::operation_canceled). Additionally, the cancellation signal will be forwarded and passed to all coroutines started by collectAny.

Similarly, collectAll also supports triggering a cancellation signal upon the completion of the first task. The differences between collectAll and collectAny are as follows:

  1. collectAll waits for all coroutines to finish executing, while collectAny returns immediately after triggering the cancellation signal.
  2. When collectAll receives a cancellation signal from the upper layer, it does not throw an exception but will wait for all coroutines to complete their execution. (However, the cancellation signal will still be forwarded to the coroutines started by collectAll.)

Executor Cancellation

We can try to request the executor to cancel a specific task, but whether the cancellation is successful depends on the implementation of the executor. The schedule interface has been updated to include a parameter, allowing the passing of a cancellationSlot, which enables the possibility of canceling tasks that are currently scheduled.

virtual void Executor::schedule(Func func, uint64_t schedule_info, cancellation_slot* slot);
virtual void Executor::scheduleWithCancel(Func func, Duration dur, uint64_t schedule_info,  cancellation_slot* slot);

Awaitor Cancellation

Cancellation is collaborative, and the correct response to a cancellation signal requires each Awaitor IO object to properly implement cancellation logic. async_simple's Awaitors will throw a std::system_error exception when they are canceled.

The following IO objects exist in async_simple:

  1. CollectAny/CollectAll
  2. Yield
  3. Sleep
  4. SpinLock
  5. Mutex
  6. SharedMutex
  7. Latch
  8. ConditionVariable
  9. Future
  10. CountingSemaphore

The Collect* have implemented cancellation. Yield, Sleep, and SpinLock depend on whether the scheduler implements cancellation (SimpleExecutor has support cancel sleep). Even if the executor does not support cancellation, Yield and Sleep will insert checkpoints for cancellation during await and resume.

uthread

uthread dont support cancellation now, but The design of signals and slots is independent, so stack coroutines could also support cancellation later. The related design needs to be discussed.

What is changing

Breakchanges:

  1. Components such as Yield and spinlock may throw exceptions.
  2. The return values of CollectAll/CollectAny have been changed to lazy, and the callback versions of collectAll/collectAny have been removed.
  3. By default, collectAny will trigger a cancellation signal.

TODO

add document & more test later.

@poor-circle poor-circle changed the title Add cancellation support. Add cancellation support Nov 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant