Skip to content

Commit

Permalink
use int64_t as priority level
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Nov 7, 2024
1 parent 7ef749f commit b382f53
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 24 deletions.
14 changes: 7 additions & 7 deletions async_simple/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ class Executor {
// implementation. However, to avoid spinlock deadlock, when priority_hint
// is less than 0, scheduler shouldn't always execute the work immediately.
virtual bool schedule(Func func,
[[maybe_unused]] int priority_hint = 0) = 0;
[[maybe_unused]] int64_t priority_hint = 0) = 0;

// Schedule a move only functor
bool schedule_move_only(util::move_only_function<void()> func,
[[maybe_unused]] int priority_hint = 0) {
[[maybe_unused]] int64_t priority_hint = 0) {
MoveWrapper<decltype(func)> tmp(std::move(func));
return schedule([func = tmp]() { func.get()(); }, priority_hint);
}
Expand Down Expand Up @@ -134,7 +134,7 @@ class Executor {
// Use
// co_await executor.after(sometime)
// to schedule current execution after some time.
TimeAwaitable after(Duration dur, int priority_hint = 0);
TimeAwaitable after(Duration dur, int64_t priority_hint = 0);

// IOExecutor accepts IO read/write requests.
// Return nullptr if the executor doesn't offer an IOExecutor.
Expand All @@ -143,7 +143,7 @@ class Executor {
}

protected:
virtual void schedule(Func func, Duration dur, int priority_hint = 0) {
virtual void schedule(Func func, Duration dur, int64_t priority_hint = 0) {
std::thread([this, func = std::move(func), dur]() {
std::this_thread::sleep_for(dur);
schedule(std::move(func));
Expand All @@ -157,7 +157,7 @@ class Executor {
// Awaiter to implement Executor::after.
class Executor::TimeAwaiter {
public:
TimeAwaiter(Executor *ex, Executor::Duration dur, int priority_hint)
TimeAwaiter(Executor *ex, Executor::Duration dur, int64_t priority_hint)
: _ex(ex), _dur(dur), _priority_hint(priority_hint) {}

public:
Expand All @@ -178,7 +178,7 @@ class Executor::TimeAwaiter {
// Awaitable to implement Executor::after.
class Executor::TimeAwaitable {
public:
TimeAwaitable(Executor *ex, Executor::Duration dur, int priority_hint)
TimeAwaitable(Executor *ex, Executor::Duration dur, int64_t priority_hint)
: _ex(ex), _dur(dur), _priority_hint(priority_hint) {}

auto coAwait(Executor *) {
Expand All @@ -192,7 +192,7 @@ class Executor::TimeAwaitable {
};

Executor::TimeAwaitable inline Executor::after(Executor::Duration dur,
int priority_hint) {
int64_t priority_hint) {
return Executor::TimeAwaitable(this, dur, priority_hint);
}

Expand Down
4 changes: 2 additions & 2 deletions async_simple/coro/Sleep.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace coro {
// e.g. co_await sleep(100s);
template <typename Rep, typename Period>
Lazy<void> sleep(std::chrono::duration<Rep, Period> dur,
int priority_hint = 0) {
int64_t priority_hint = 0) {
auto ex = co_await CurrentExecutor();
if (!ex) {
std::this_thread::sleep_for(dur);
Expand All @@ -43,7 +43,7 @@ Lazy<void> sleep(std::chrono::duration<Rep, Period> dur,

template <typename Rep, typename Period>
Lazy<void> sleep(Executor* ex, std::chrono::duration<Rep, Period> dur,
int priority_hint = 0) {
int64_t priority_hint = 0) {
co_return co_await ex->after(
std::chrono::duration_cast<Executor::Duration>(dur), priority_hint);
}
Expand Down
3 changes: 2 additions & 1 deletion async_simple/coro/test/ResumeByScheduleTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class MockExecutorForResumeBySchedule : public SimpleExecutor {
explicit MockExecutorForResumeBySchedule(size_t thread_num)
: SimpleExecutor(thread_num), schedule_count_(0), checkin_count_(0) {}

bool schedule(Func func, [[maybe_unused]] int priority_hint = 0) override {
bool schedule(Func func,
[[maybe_unused]] int64_t priority_hint = 0) override {
++schedule_count_;
return Base::schedule(std::move(func));
}
Expand Down
4 changes: 2 additions & 2 deletions async_simple/coro/test/SleepTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ TEST_F(SleepTest, testSleep) {
class Executor : public async_simple::Executor {
public:
Executor() = default;
virtual bool schedule(Func, int priority_hint = 0) override {
virtual bool schedule(Func, int64_t priority_hint = 0) override {
return true;
}
virtual void schedule(Func func, Duration dur,
int priority_hint = 0) override {
int64_t priority_hint = 0) override {
std::thread([this, func = std::move(func), dur]() {
id = std::this_thread::get_id();
std::this_thread::sleep_for(dur);
Expand Down
3 changes: 2 additions & 1 deletion async_simple/executors/SimpleExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class SimpleExecutor : public Executor {
~SimpleExecutor() { _ioExecutor.destroy(); }

public:
bool schedule(Func func, [[maybe_unused]] int priority_hint = 0) override {
bool schedule(Func func,
[[maybe_unused]] int64_t priority_hint = 0) override {
return _pool.scheduleById(std::move(func)) ==
util::ThreadPool::ERROR_NONE;
}
Expand Down
6 changes: 3 additions & 3 deletions demo_example/asio_coro_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ class AsioExecutor : public async_simple::Executor {
return &current;
}

virtual bool schedule(Func func, int prority_hint) override {
if (prority_hint < 0) {
virtual bool schedule(Func func, int64_t priority_hint) override {
if (priority_hint < 0) {
asio::post(executor_, std::move(func));
} else {
asio::dispatch(executor_, std::move(func));
Expand All @@ -154,7 +154,7 @@ class AsioExecutor : public async_simple::Executor {
}

private:
void schedule(Func func, Duration dur, int hint) override {
void schedule(Func func, Duration dur, int64_t priority_hint) override {
auto timer = std::make_unique<asio::steady_timer>(executor_, dur);
auto tm = timer.get();
tm->async_wait([fn = std::move(func),
Expand Down
10 changes: 5 additions & 5 deletions docs/docs.cn/Executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ public:
using Func = std::function<void()>;
using Context = void *;
virtual bool schedule(Func func, int prority_hint = 0) = 0;
virtual bool schedule(Func func, Duration d, int priority_hint = 0) = 0;
bool schedule_move_only(util::move_only_function<void()> func, int prority_hint = 0);
virtual bool schedule(Func func, int64_t priority_hint = 0) = 0;
virtual bool schedule(Func func, Duration d, int64_t priority_hint = 0) = 0;
bool schedule_move_only(util::move_only_function<void()> func, int64_t priority_hint = 0);
virtual bool currentThreadInExecutor() const = 0;
virtual ExecutorStat stat() const = 0;
virtual Context checkout();
virtual bool checkin(Func func, Context ctx, ScheduleOptions opts);
virtual IOExecutor* getIOExecutor() = 0;
```

- `virtual bool schedule(Func func, int prority_hint) = 0;` 接口接收一个lambda函数进行调度执行。实现时将该lambda调度到任意一个线程执行即可。当 `schedule(Func)` 返回 `true` 时,该调度器实现需要保证 func 一定会被执行。否则被提交到调度器的 Lazy 任务可能会导致程序出现内存泄漏问题。
- `virtual bool schedule(Func func, int64_t priority_hint) = 0;` 接口接收一个lambda函数进行调度执行。实现时将该lambda调度到任意一个线程执行即可。当 `schedule(Func)` 返回 `true` 时,该调度器实现需要保证 func 一定会被执行。否则被提交到调度器的 Lazy 任务可能会导致程序出现内存泄漏问题。
- `priority_hint` 用于指导调度器的调度顺序,数字越大暗示任务的优先级越高,默认的调度优先级为0。具体调度逻辑由executor的实现决定,不做强制规定。但需要注意的是,为了避免`async_simple::coro::spinlock`死锁,至少需要实现以下逻辑:当优先级小于0时,调度器必须保证当前任务不会总是被立即执行。
- `bool schedule_move_only(util::move_only_function<void()> func, int prority_hint);` 接口可以接受一个move_only/copyable functor进行调度执行,这个接口是为了弥补std::function不能接收move_only functor的缺陷。
- `bool schedule_move_only(util::move_only_function<void()> func, int64_t priority_hint);` 接口可以接受一个move_only/copyable functor进行调度执行,这个接口是为了弥补std::function不能接收move_only functor的缺陷。
- `virtual bool currentThreadInExecutor() const = 0;` 接口用于检查调用者是否运行在当前Executor中。实现时判断当前线程是否隶属于Executor。
- `virtual ExecutorStat stat() const = 0;` 接口获取当前Executor状态信息。实现时可以继承ExecutorStat,添加更多状态信息方便用户调试和监控Executor。
- `virtual Context checkout();` 接口获取当前执行上下文信息。实现时一般可以直接返回当前线程在Executor中唯一标识id。
Expand Down
6 changes: 3 additions & 3 deletions docs/docs.en/Executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public:
using Func = std::function<void()>;
using Context = void *;
virtual bool schedule(Func func, int priority_hint = 0) = 0;
virtual bool schedule(Func func, Duration d, int priority_hint = 0) = 0;
bool schedule_move_only(util::move_only_function<void()> func, int priority_hint = 0);
virtual bool schedule(Func func, int64_t priority_hint = 0) = 0;
virtual bool schedule(Func func, Duration d, int64_t priority_hint = 0) = 0;
bool schedule_move_only(util::move_only_function<void()> func, int64_t priority_hint = 0);
virtual bool currentThreadInExecutor() const = 0;
virtual ExecutorStat stat() const = 0;
virtual Context checkout();
Expand Down

0 comments on commit b382f53

Please sign in to comment.