Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Jul 27, 2024
1 parent ab2eb72 commit 34688b3
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 76 deletions.
2 changes: 1 addition & 1 deletion meta/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

ensure((0, 7, 0))

from . import idl, start, tools # noqa E402, F401: Needed for side effect
from . import start, tools # noqa E402, F401: Needed for side effect
17 changes: 0 additions & 17 deletions meta/plugins/idl.py

This file was deleted.

1 change: 0 additions & 1 deletion meta/plugins/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
python-magic ~= 0.4.27
git+https://github.com/cute-engineering/[email protected]
git+https://github.com/cute-engineering/chatty
12 changes: 0 additions & 12 deletions src/libs/karm-ipc/hook.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
#pragma once

#include <karm-sys/context.h>
#include <karm-sys/socket.h>

struct ChannelHook : public Sys::Service {
Sys::IpcConnection con;

ChannelHook(Sys::IpcConnection con)
: con(std::move(con)) {}
};

inline ChannelHook &useChannel(Sys::Context &ctx = Sys::globalContext()) {
return ctx.use<ChannelHook>();
}
151 changes: 151 additions & 0 deletions src/libs/karm-sys/ipc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#pragma once

#include <karm-base/cons.h>
#include <karm-io/pack.h>
#include <karm-logger/logger.h>
#include <karm-sys/async.h>
#include <karm-sys/context.h>
#include <karm-sys/socket.h>

struct ChannelHook : public Sys::Service {
Sys::IpcConnection con;

ChannelHook(Sys::IpcConnection con)
: con(std::move(con)) {}
};

inline ChannelHook &useChannel(Sys::Context &ctx = Sys::globalContext()) {
return ctx.use<ChannelHook>();
}

namespace Karm::Sys {

using Port = Distinct<u64, struct _PortTag>;

struct Header {
u64 seq;
Port port;
Meta::Id mid;
};

struct Message {
Array<u8, 4096> buf;
usize len;
Array<Sys::Handle, 16> hnds;
usize hndsLen;

Res<Header> header() const {
Io::PackScan s{buf, hnds};
return Io::unpack<Header>(s);
}

template <typename T>
bool is() const {
auto maybeHeader = header();
if (not maybeHeader)
return false;
return maybeHeader.unwrap().mid == Meta::idOf<T>();
}

template <typename T>
Res<T> unpack() {
Io::PackScan s{buf, hnds};
if (not is<T>())
return panic("unexpected message");
try$(Io::unpack<Header>(s));
return Io::unpack<T>(s);
}
};

struct Ipc {
Sys::IpcConnection _con;
bool _receiving = false;
Map<u64, Async::_Promise<Message>> _pending{};
u64 _seq = 1;

static Ipc create(Sys::Context &ctx) {
auto &channel = useChannel(ctx);
return Ipc{std::move(channel.con)};
}

template <typename T, typename... Args>
Res<> _send(Port port, u64 seq, Args &&...args) {
Header header{seq, port, Meta::idOf<T>()};
T msg{std::forward<Args>(args)...};

Io::BufferWriter reqBuf;
Io::PackEmit reqPack{reqBuf};

co_try$(Io::pack(reqPack, header));
co_try$(Io::pack(reqPack, msg));

co_trya$(_con.sendAsync(reqBuf.bytes(), reqPack.handles()));

co_return Ok();
}

template <typename T, typename... Args>
Res<> send(Port port, Args &&...args) {
return _sendAsync<T>(port, _seq++, std::forward<Args>(args)...);
}

Async::Task<Message> recvAsync() {
if (_receiving)
co_return Error::other("already receiving");

_receiving = true;
Defer defer{[this] {
_receiving = false;
}};

while (true) {
Message msg;
auto [bufLen, hndsLen] = co_trya$(_con.recvAsync(msg.buf, msg.hnds));
msg.len = bufLen;
msg.hndsLen = hndsLen;

auto maybeHeader = msg.header();
if (not maybeHeader) {
logWarn("dropping message: {}", maybeHeader.none().msg());
continue;
}

if (_pending.has(maybeHeader.unwrap().seq)) {
auto promise = _pending.take(maybeHeader.unwrap().seq);
promise.resolve(msg);
continue;
}

co_return msg;
}
}

template <typename T>
Res<> resp(Message &msg, Res<typename T::Response> message) {
auto header = try$(msg.header());
send<T>(header.port, header.seq, message);
}

template <typename T, typename... Args>
Async::Task<typename T::Response> callAsync(Port port, Args &&...args) {
auto seq = _seq++;
Async::_Promise<Message> promise;
auto future = promise.future();

_pending.put(seq, std::move(promise));

co_try$(_send<T>(port, seq, std::forward<Args>(args)...));

Message msg = co_await future;

if (msg.is<Error>())
co_return msg.unpack<Error>();

if (not msg.is<typename T::Response>())
co_return Error::invalidInput("unexpected response");

co_return Ok(msg.unpack<typename T::Response>());
}
};

} // namespace Karm::Sys
7 changes: 0 additions & 7 deletions src/srvs/grund-bus/api.idl

This file was deleted.

13 changes: 0 additions & 13 deletions src/srvs/grund-device/api.idl

This file was deleted.

10 changes: 5 additions & 5 deletions src/srvs/grund-dns/main.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#include <grund-echo/api.h>
#include <karm-ipc/ipc.h>
#include <karm-logger/logger.h>
#include <karm-sys/entry.h>
#include <karm-sys/ipc.h>

Async::Task<> entryPointAsync(Sys::Context &ctx) {
auto server = co_try$(Ipc::Server::create(ctx));
Sys::Ipc ipc = Sys::Ipc::create(ctx);

logInfo("service started");

logDebug("sending nonsense to system ---------------------------------------");

auto object = Ipc::open<Grund::IEcho>(server, 0);
auto res = co_trya$(object->echoAsync("hello"s));
auto [header, resp] = co_trya$(ipc.callAsync<Grund::Echo::Request>({}, "nonsense"s));

logDebug("received response from system -----------------------------------");

co_return co_trya$(server.runAsync());
co_return Ok();
}
12 changes: 12 additions & 0 deletions src/srvs/grund-echo/api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <karm-base/string.h>

namespace Grund::Echo {

struct Request {
using Response = String;
String msg;
};

} // namespace Grund::Echo
7 changes: 0 additions & 7 deletions src/srvs/grund-echo/api.idl

This file was deleted.

24 changes: 11 additions & 13 deletions src/srvs/grund-echo/main.cpp
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
#include <grund-echo/api.h>
#include <karm-ipc/ipc.h>
#include <karm-sys/entry.h>
#include <karm-sys/ipc.h>

namespace Grund::Echo {

struct Service : public Ipc::Object<Grund::IEcho> {
using Ipc::Object<Grund::IEcho>::Object;

Async::Task<String> echoAsync(String msg) override {
co_return msg;
Async::Task<> serv(Sys::Ipc &ipc) {
while (true) {
auto msg = co_trya$(ipc.recvAsync());
if (msg.is<Request>()) {
auto req = co_try$(msg.unpack<Request>());
co_try$(ipc.resp<Request>(msg, Ok(req.msg)));
}
}
};
}

} // namespace Grund::Echo

Async::Task<> entryPointAsync(Sys::Context &ctx) {
auto server = co_try$(Ipc::Server::create(ctx));
Grund::Echo::Service service{server};

logInfo("service started");

co_return co_trya$(server.runAsync());
Sys::Ipc ipc = Sys::Ipc::create(ctx);
co_return co_await Grund::Echo::serv(ipc);
}

0 comments on commit 34688b3

Please sign in to comment.