Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Aug 6, 2023
1 parent 1648cdb commit d87ec43
Show file tree
Hide file tree
Showing 21 changed files with 650 additions and 145 deletions.
9 changes: 9 additions & 0 deletions src/libs/karm-async/_embed.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include "loop.h"

namespace Karm::Async::_Embed {

EventLoop &eventLoop();

} // namespace Karm::Async::_Embed
178 changes: 49 additions & 129 deletions src/libs/karm-async/async.h
Original file line number Diff line number Diff line change
@@ -1,171 +1,91 @@
#pragma once

#include <karm-base/func.h>
#include <karm-base/list.h>
#include <karm-base/res.h>
#include <karm-base/time.h>
#include <karm-base/tuple.h>
#include <karm-meta/nocopy.h>
#include <karm-sys/proc.h>
#include <karm-sys/time.h>
#include <karm-logger/logger.h>

#include "funcs.h"

namespace Karm::Async {

template <typename T = void>
using Coro = std::coroutine_handle<T>;

using Eval = Func<TimeStamp()>;
template <typename T = Ok<>>
struct Task {
using Inner = T;

struct Sched {
struct Queued {
Coro<> coro;
Eval eval;
struct Ret : public BaseEvent<Event> {
T res;
};

List<Queued> _queue;

static Sched &instance() {
static Sched sched;
return sched;
}

void queue(Coro<> coro, Eval eval) {
_queue.emplaceBack(coro, std::move(eval));
}

TimeStamp schedule(TimeStamp now) {
auto soon = TimeStamp::endOfTime();
for (usize i = 0; i < _queue.len(); i++) {
auto &queued = _queue.peekFront();
auto evaled = queued.eval();
if (Op::lteq(evaled, now)) {
queued.coro.resume();
_queue.popFront();
return now;
}

if (Op::lt(evaled, soon)) {
soon = evaled;
}

_queue.requeue();
struct Promise {
Opt<Strong<Async::Sink>> _sink;
Opt<T> _res = NONE;
Task get_return_object() { return {Coro<Promise>::from_promise(*this)}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_value(T res) {
if (_sink)
post<Ret>(*_sink, std::move(res));
else
logWarn("Task::Promise::return_value() called without a sink");
}
auto take() { return _res.take(); }
void bind(Strong<Async::Sink> sink) { _sink = sink; }
};

return soon;
}

Res<> run() {
while (_queue.len() > 0) {
TimeStamp now = Sys::now();
auto soon = schedule(now);
while (Op::lteq(soon, now)) {
now = Sys::now();
soon = schedule(now);
}

if (_queue.len() > 0)
try$(Sys::sleepUntil(soon));
}

return Ok();
}
};

template <typename Task>
struct Promise {
Opt<typename Task::Res> _res = NONE;
Task get_return_object() { return {Coro<Promise>::from_promise(*this)}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_value(typename Task::Res res) { _res = res; }
};

template <typename _Res = Ok<>>
struct Task : public Meta::NoCopy {
using Res = _Res;
using PromiseType = Promise<Task>;
using CoroType = Coro<PromiseType>;

using promise_type = PromiseType;

CoroType _coro;
Coro<Promise> _coro;

Task(Coro<Promise<Task>> coro)
Task(Coro<Promise> coro)
: _coro(coro) {}

Task(Task &&other)
: _coro(std::exchange(other._coro, {})) {}

~Task() {
if (_coro) {
if (_coro)
_coro.destroy();
}
}

/* --- Awaitable -------------------------------------------------------- */

bool await_ready() const noexcept {
return false;
Promise &promise() {
return _coro.promise();
}

void await_suspend(Coro<> coro) const noexcept {
Sched::instance().queue(coro, [&]() {
return _coro.done() ? TimeStamp::epoch() : TimeStamp::endOfTime();
});
void bind(Strong<Sink> sink) {
promise().bind(std::move(sink));
}

Res await_resume() const noexcept {
return _coro.promise()._res.unwrap();
}

/* --- Run -------------------------------------------------------------- */
// Theses methods and types are required by the coroutine machinery

Karm::Res<Res> runSync() {
while (!_coro.done()) {
auto until = Sched::instance().schedule(Sys::now());
try$(Sys::sleepUntil(until));
}
return Ok(_coro.promise()._res.take());
bool await_ready() const noexcept {
return false;
}
};

inline auto sleep(TimeSpan span) {
struct Awaitable {
TimeStamp _t;

constexpr bool await_ready() const noexcept { return false; }

void await_suspend(Coro<> coro) const noexcept {
Sched::instance().queue(coro, [t = _t]() {
return t;
});
}

void await_resume() const noexcept {}
};

return Awaitable{Sys::now() + span};
}
void await_suspend(Coro<> coro) const noexcept {
struct Sink : public Async::Sink {
Coro<> _coro;

inline auto yield() {
struct Awaitable {
constexpr bool await_ready() const noexcept { return false; }
Res<> handle(Async::Event &) override {
_coro.resume();
return Ok();
}
};

void await_suspend(Coro<> coro) const noexcept {
Sched::instance().queue(coro, []() {
return TimeStamp::epoch();
});
}
bind(makeStrong<Sink>(coro));
}

void await_resume() const noexcept {}
};
T await_resume() const noexcept {
return promise().take();
}

return Awaitable{};
}
using promise_type = Promise;
};

template <typename... Args>
inline Task<Tuple<typename Args::Res...>> all(Args &...tasks) {
inline Task<Tuple<typename Args::Inner...>> all(Args &...tasks) {
co_return {co_await tasks...};
}

Expand Down
8 changes: 8 additions & 0 deletions src/libs/karm-async/core.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

#include "_embed.h"

#include "defer.h"
#include "funcs.h"
#include "loop.h"
#include "timer.h"
58 changes: 58 additions & 0 deletions src/libs/karm-async/defer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include <karm-base/func.h>

#include "async.h"
#include "funcs.h"

namespace Karm::Async {

struct Defer : public Sink {
Func<Res<>()> _func;

struct Event : public BaseEvent<Event> {
Strong<Sink> sink;
Event(Strong<Sink> sink) : sink{std::move(sink)} {}
};

Res<> handle(Async::Event &) override {
return _func();
}
};

static inline void defer(Strong<Sink> sink) {
post<Defer::Event>(*sink, sink);
}

static inline Strong<Defer> defer(Func<Res<>()> func) {
auto sink = makeStrong<Defer>(func);
defer(sink);
return sink;
}

static inline auto defer() {
struct Sink : public Async::Sink {
Coro<> _coro;

Res<> handle(Async::Event &) override {
_coro.resume();
return Ok();
}
};

struct Await {
TimeStamp _stamp;

constexpr bool await_ready() const { return false; }

void await_suspend(Coro<> coro) {
defer(makeStrong<Sink>(coro));
}

void await_resume() const {}
};

return Await{};
}

} // namespace Karm::Async
56 changes: 56 additions & 0 deletions src/libs/karm-async/event.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <karm-meta/id.h>

namespace Karm::Async {

struct Event {
bool _accepted{false};

virtual ~Event() = default;

virtual Meta::Type<> type() const = 0;

void accept() {
_accepted = true;
}

template <typename T>
T &unwrap() {
return static_cast<T &>(*this);
}

template <typename T>
T const &unwrap() const {
return static_cast<T const &>(*this);
}

template <typename T>
T const *is() const {
return type() == Meta::typeOf<T>() ? &unwrap<T>() : nullptr;
}

template <typename T>
T *is() {
return type() == Meta::typeOf<T>() ? &unwrap<T>() : nullptr;
}

template <typename T>
Event &handle(auto callback) {
if (auto *e = is<T>()) {
_accepted = _accepted or callback(*e);
}
return *this;
}
};

template <typename Crtp>
struct BaseEvent : public Event {
Meta::Type<Crtp> _type = Meta::typeOf<Crtp>();

Meta::Type<> type() const override {
return _type;
}
};

} // namespace Karm::Async
26 changes: 26 additions & 0 deletions src/libs/karm-async/funcs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include "_embed.h"

namespace Karm::Async {

inline Res<> run() {
return _Embed::eventLoop().run();
}

inline void quit(Res<> res = Ok()) {
_Embed::eventLoop().quit(std::move(res));
}

inline TimeStamp now() {
return _Embed::eventLoop().now();
}

template <typename T, typename... Args>
inline void post(Sink &sink, Args &&...args) {
_Embed::eventLoop()
.queue()
.post(sink, makeBox<T>(std::forward<Args>(args)...));
}

} // namespace Karm::Async
Loading

0 comments on commit d87ec43

Please sign in to comment.