Skip to content

Commit

Permalink
add cancellation signal/slot, add cancellation support for sleep/coll…
Browse files Browse the repository at this point in the history
…ect*/schedule
  • Loading branch information
poor-circle committed Nov 15, 2024
1 parent af5bcc8 commit a1610ed
Show file tree
Hide file tree
Showing 10 changed files with 774 additions and 485 deletions.
264 changes: 264 additions & 0 deletions async_simple/Cancellation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
* Copyright (c) 2024, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ASYNC_SIMPLE_CANCELLATION_H
#define ASYNC_SIMPLE_CANCELLATION_H

#ifndef ASYNC_SIMPLE_USE_MODULES

#include <assert.h>
#include <atomic>
#include <cstdint>
#include <memory>
#include <system_error>
#include <utility>

#include "async_simple/Common.h"
#include "util/move_only_function.h"
#endif // ASYNC_SIMPLE_USE_MODULES

namespace async_simple {

enum class CancellationType : uint64_t {
none = 0,
// low bits reserve for low level cancellation
terminal = 0x4000'0000'0000'0000, // max level of cancellation
all = 0x7fff'ffff'ffff'ffff,
};

class CancellationSlot;

class CancellationSignal
: public std::enable_shared_from_this<CancellationSignal> {
struct PrivateConstructTag {};
struct Node {
CancellationSlot* _slot;
Node* _next;
};
static inline Node signalEmittedTag{};
void registSlot(CancellationSlot* slot);

public:
CancellationSignal(PrivateConstructTag){};
bool emit(CancellationType state) noexcept;
friend class CancellationSlot;
CancellationType state() const noexcept {
return _state.load(std::memory_order_acquire);
}
static std::shared_ptr<CancellationSignal> create() {
return std::make_shared<CancellationSignal>(PrivateConstructTag{});
}
~CancellationSignal();

private:
std::atomic<CancellationType> _state;
std::atomic<Node*> _slotsHead;
};

class CancellationSlot {
using CancellationHandler =
util::move_only_function<void(CancellationType)>;

public:
CancellationSlot(CancellationSignal* signal,
CancellationType filter = CancellationType::all)
: _signal(signal->shared_from_this()), _filter(filter) {
signal->registSlot(this);
};
~CancellationSlot() { delete _handler.load(std::memory_order_acquire); }

static constexpr CancellationType forbidden =
static_cast<CancellationType>(0x8000'0000'0000'0000);

public:
// Register a signal handler. Returns false if the cancellation signal has
// already been triggered.
template <typename... Args>
[[nodiscard]] bool emplace(Args&&... args) {
auto handler = new CancellationHandler{std::forward<Args>(args)...};
logicAssert(static_cast<bool>(handler),
"CancellationSlot dont allow emplace empty handler");
delete _handler.exchange(handler, std::memory_order_acq_rel);
auto s = state();
if (s != CancellationType::none) {
// if slot is canceled but handler doesn't exec, return false
return clear() != handler || !canceled(s);
}
return true;
}
// Clear the signal handler. If a null pointer is returned, it indicates
// that the signal handler has been executed or that no signal has been
// registered yet.
void* clear() {
CancellationHandler* handler =
_handler.exchange(nullptr, std::memory_order_acq_rel);
delete handler;
return handler;
}

class FilterGuard {
private:
FilterGuard(CancellationSlot* _slot,
CancellationType _newFilter) noexcept
: _oldFilter(_slot->getFilter()) {
auto newFilter = static_cast<uint64_t>(_oldFilter) &
static_cast<uint64_t>(_newFilter);
_slot->setFilter(static_cast<CancellationType>(newFilter));
}
~FilterGuard() noexcept {
if (_slot->getFilter() != forbidden)
_slot->setFilter(_oldFilter);
}
friend class CancellationSlot;
CancellationSlot* _slot;
CancellationType _oldFilter;
};

// Filter signals within the specified scope. If signal type & filter is 0,
// then the signal type will not be triggered within this scope. Nested
// filters are allowed.
[[nodiscard]] FilterGuard addScopedFilter(
CancellationType filter) noexcept {
return FilterGuard{this, filter};
}

// Get the current scope's filter.
CancellationType getFilter() const noexcept {
return _filter.load(std::memory_order_acquire);
}

// Get the current cancellation signal status.
CancellationType state() const noexcept { return _signal->state(); }

// Check whether the filtered cancellation signal is in a triggered state.
bool canceled() const noexcept { return canceled(state()); }

// The slot holds ownership of the corresponding signal, so the signal's
// lifetime is always longer than the slot's. To extend the signal's
// lifetime, you can call signal()->shared_from_this(), or start a new
// coroutine with the signal.
CancellationSignal* signal() const noexcept { return _signal.get(); }

// ignore all subsequent cancellation signals.
void forbidCancellation() noexcept {
setFilter(forbidden);
clear();
}

friend class CancellationSignal;

// those helper static functions are used for coroutine await_resume,
// await_suspend & await_ready

template <typename... Args>
[[nodiscard]] static bool suspend(CancellationSlot* slot,
Args&&... args) noexcept {
if (slot &&
!slot->emplace(std::forward<Args>(args)...)) { // has canceled
return false;
}
return true;
}

static void resume(CancellationSlot* slot) {
if (slot && slot->clear() == nullptr && slot->canceled()) {
throw std::system_error{
std::make_error_code(std::errc::operation_canceled)};
}
}

static void resume(CancellationSlot* slot, std::error_code& ec) {
if (slot && slot->clear() == nullptr && slot->canceled()) {
ec = std::make_error_code(std::errc::operation_canceled);
}
}

static bool ready(CancellationSlot* slot) noexcept {
return slot && slot->canceled();
}

private:
void setFilter(CancellationType filter) noexcept {
_filter.store(filter, std::memory_order_release);
}
bool canceled(CancellationType state) const noexcept {
return static_cast<uint64_t>(state) &
static_cast<uint64_t>(_filter.load(std::memory_order_acquire)) &
static_cast<uint64_t>(CancellationType::all);
}
void operator()(CancellationType state) noexcept {
if (canceled(state)) {
if (CancellationHandler* handler =
_handler.exchange(nullptr, std::memory_order_acq_rel)) {
(*handler)(state);
delete handler;
}
}
}
std::shared_ptr<CancellationSignal> _signal;
std::atomic<CancellationHandler*> _handler;
std::atomic<CancellationType> _filter;
};

inline bool CancellationSignal::emit(
CancellationType state = CancellationType::terminal) noexcept {
CancellationType expected = CancellationType::none;
if (state != expected) {
if (_state.compare_exchange_strong(expected, state,
std::memory_order_release)) {
for (Node *node = _slotsHead.exchange(&signalEmittedTag,
std::memory_order_acq_rel),
*next_node;
node != nullptr; node = next_node) {
(*node->_slot)(state);
next_node = node->_next;
delete node;
}
return true;
} else {
return false;
}
}
return true;
}

inline void CancellationSignal::registSlot(CancellationSlot* slot) {
auto next_node = _slotsHead.load(std::memory_order_relaxed);
if (next_node == &signalEmittedTag) {
return;
}
auto* node = new CancellationSignal::Node{slot};
node->_next = next_node;
while (node->_next != &signalEmittedTag &&
!_slotsHead.compare_exchange_weak(node->_next, node,
std::memory_order_release,
std::memory_order_relaxed))
;
if (node->_next == &signalEmittedTag) {
delete node;
}
}

inline CancellationSignal::~CancellationSignal() {
for (Node *node = _slotsHead.load(std::memory_order_acquire), *next_node;
node != nullptr && node != &signalEmittedTag; node = next_node) {
next_node = node->_next;
delete node;
}
}

}; // namespace async_simple

#endif // ASYNC_SIMPLE_CANCELLATION_H
Loading

0 comments on commit a1610ed

Please sign in to comment.