Skip to content

Commit

Permalink
Adapt to the latest changes in graph-prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-cukic committed Mar 7, 2024
1 parent f51dfc6 commit ee2aa3c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 130 deletions.
212 changes: 88 additions & 124 deletions src/ui/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,55 +31,6 @@ enum class WindowMode {
};

namespace detail {
struct SchedWrapper {
template<typename T, typename... Args>
void emplace(Args &&...args) {
handler = std::make_unique<HandlerImpl<T>>(std::forward<Args>(args)...);
}

explicit operator bool() const { return handler != nullptr; };

private:
struct Handler {
virtual ~Handler() = default;
};

template<typename TScheduler>
struct HandlerImpl : Handler {
TScheduler _scheduler;
std::thread _thread;
std::atomic<bool> stopRequested = false;

template<typename... Args>
explicit HandlerImpl(Args &&...args)
: _scheduler(std::forward<Args>(args)...) {
_thread = std::thread([this]() {
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::INITIALISED); !e) {
// TODO: handle error return message
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::RUNNING); !e) {
// TODO: handle error return message
}
while (!stopRequested && _scheduler.isProcessing()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::REQUESTED_STOP); !e) {
// TODO: handle error return message
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::STOPPED); !e) {
// TODO: handle error return message
}
});
}

~HandlerImpl() {
stopRequested = true;
_thread.join();
}
};

std::unique_ptr<Handler> handler;
};
} // namespace detail

class App {
Expand Down Expand Up @@ -119,12 +70,95 @@ class App {
std::shared_ptr<gr::thread_pool::BasicThreadPool> schedulerThreadPool = std::make_shared<gr::thread_pool::BasicThreadPool>(
"scheduler-pool", gr::thread_pool::CPU_BOUND, 4, 4);

detail::SchedWrapper _scheduler;
Style _style = Style::Light;
std::vector<std::function<void()>> _activeCallbacks;
std::vector<std::function<void()>> _garbageCallbacks; // TODO: Cleaning up callbacks
std::mutex _callbacksMutex;

struct SchedWrapper {
template<typename T, typename... Args>
void emplace(Args &&...args) {
handler = std::make_unique<HandlerImpl<T>>(std::forward<Args>(args)...);
}

explicit operator bool() const { return handler != nullptr; };

void sendMessage(const gr::Message &msg) { handler->sendMessage(msg); }
void handleMessages(FlowGraph &fg) { handler->handleMessages(fg); }

private:
struct Handler {
virtual ~Handler() = default;
virtual void sendMessage(const gr::Message &msg) = 0;
virtual void handleMessages(FlowGraph &fg) = 0;
};

template<typename TScheduler>
struct HandlerImpl : Handler {
TScheduler _scheduler;
std::thread _thread;
std::atomic<bool> stopRequested = false;

gr::MsgPortIn _fromScheduler;
gr::MsgPortOut _toScheduler;

template<typename... Args>
explicit HandlerImpl(Args &&...args)
: _scheduler(std::forward<Args>(args)...) {
if (_toScheduler.connect(_scheduler.msgIn) != gr::ConnectionResult::SUCCESS) {
throw fmt::format("Failed to connect _toScheduler -> _scheduler.msgIn\n");
}
if (_scheduler.msgOut.connect(_fromScheduler) != gr::ConnectionResult::SUCCESS) {
throw fmt::format("Failed to connect _scheduler.msgOut -> _fromScheduler\n");
}

_thread = std::thread([this]() {
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::INITIALISED); !e) {
// TODO: handle error return message
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::RUNNING); !e) {
// TODO: handle error return message
}
while (!stopRequested && _scheduler.isProcessing()) {
_scheduler.processScheduledMessages();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::REQUESTED_STOP); !e) {
// TODO: handle error return message
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::STOPPED); !e) {
// TODO: handle error return message
}
});
}

void sendMessage(const gr::Message &msg) final {
_toScheduler.streamWriter().publish([&](auto &output) { output[0] = msg; }, 1);
}

void handleMessages(FlowGraph &fg) final {
const auto available = _fromScheduler.streamReader().available();
if (available > 0) {
const auto messages = _fromScheduler.streamReader().get(available);

for (const auto &msg : messages) {
fg.handleMessage(msg);
}
std::ignore = _fromScheduler.streamReader().consume(available);
}
}

~HandlerImpl() {
stopRequested = true;
_thread.join();
}
};

std::unique_ptr<Handler> handler;
};

SchedWrapper _scheduler;

public:
App() noexcept { setStyle(Style::Light); }

Expand Down Expand Up @@ -212,86 +246,16 @@ class App {
}

void sendMessage(const gr::Message &msg) {
if (m_scheduler) {
m_scheduler.sendMessage(msg);
if (_scheduler) {
_scheduler.sendMessage(msg);
}
}

void handleMessages(FlowGraph &fg) {
if (m_scheduler) {
m_scheduler.handleMessages(fg);
if (_scheduler) {
_scheduler.handleMessages(fg);
}
}

private:
struct SchedWrapper {
template<typename T, typename... Args>
void emplace(Args &&...args) {
handler = std::make_unique<HandlerImpl<T>>(std::forward<Args>(args)...);
}
explicit operator bool() const { return handler != nullptr; };
void sendMessage(const gr::Message &msg) { handler->sendMessage(msg); }
void handleMessages(FlowGraph &fg) { handler->handleMessages(fg); }

private:
struct Handler {
virtual ~Handler() = default;
virtual void sendMessage(const gr::Message &msg) = 0;
virtual void handleMessages(FlowGraph &fg) = 0;
};
template<typename T>
struct HandlerImpl : Handler {
T data;
std::thread thread;
std::atomic<bool> stopRequested = false;
gr::MsgPortInNamed<"__Builtin"> msgIn;
gr::MsgPortOutNamed<"__Builtin"> msgOut;

template<typename... Args>
explicit HandlerImpl(Args &&...args)
: data(std::forward<Args>(args)...) {
msgOut.connect(data.msgIn);
data.msgOut.connect(msgIn);
thread = std::thread([this]() {
data.init();
data.start();
while (!stopRequested && data.isProcessing()) {
data.processScheduledMessages();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
data.stop();
});
}
~HandlerImpl() {
stopRequested = true;
thread.join();
}
void sendMessage(const gr::Message &msg) final {
auto t = gr::messageField<std::string>(msg, gr::message::key::Target);
msgOut.streamWriter().publish([&](auto &output) { output[0] = msg; }, 1);
}
void handleMessages(FlowGraph &fg) final {
const auto available = msgIn.streamReader().available();
if (available > 0) {
const auto messages = msgIn.streamReader().get(available);

for (const auto &msg : messages) {
fg.handleMessage(msg);
}
auto c = msgIn.streamReader().consume(available);
}
}
};

std::unique_ptr<Handler> handler;
};

SchedWrapper m_scheduler;

Style m_style = Style::Light;
std::vector<std::function<void()>> m_activeCallbacks;
std::vector<std::function<void()>> m_garbageCallbacks; // TODO: Cleaning up callbacks
std::mutex m_callbacksMutex;
};

} // namespace DigitizerUi
Expand Down
12 changes: 9 additions & 3 deletions src/ui/blocks/SineSource.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#ifndef OPENDIGITIZER_SINESOURCE_HPP
#define OPENDIGITIZER_SINESOURCE_HPP

Expand All @@ -12,6 +11,7 @@ namespace opendigitizer {
template<typename T>
requires std::is_arithmetic_v<T>
struct SineSource : public gr::Block<SineSource<T>, gr::BlockingIO<true>> {
using super_t = gr::Block<SineSource<T>, gr::BlockingIO<true>>;
gr::MsgPortIn freqIn;
gr::MsgPortOut freqOut;
gr::PortOut<T> out{};
Expand All @@ -35,8 +35,12 @@ struct SineSource : public gr::Block<SineSource<T>, gr::BlockingIO<true>> {
}

void
processMessages(auto &, std::span<const gr::Message> message) {
fmt::print("received Message: {}", message.size());
processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span<const gr::Message> messages) {
super_t::processMessages(port, messages);
}

void
processMessages(gr::MsgPortIn &, std::span<const gr::Message> message) {
std::ranges::for_each(message, [this](auto &m) {
if (m.contains("frequency")) {
using namespace std::string_literals;
Expand Down Expand Up @@ -96,5 +100,7 @@ struct SineSource : public gr::Block<SineSource<T>, gr::BlockingIO<true>> {
} // namespace opendigitizer

ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::SineSource, out, freqIn, freqOut, frequency)
static_assert(gr::traits::block::can_processMessagesForPortStdSpan<opendigitizer::SineSource<float>, gr::MsgPortInNamed<"__Builtin">>);
static_assert(gr::traits::block::can_processMessagesForPortStdSpan<opendigitizer::SineSource<float>, gr::MsgPortIn>);

#endif
6 changes: 4 additions & 2 deletions src/ui/flowgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ void Block::setParameter(const std::string &name, const pmtv::pmt &p) {
gr::Message msg;
msg[gr::message::key::Target] = m_uniqueName;
msg[gr::message::key::Kind] = gr::message::kind::UpdateSettings;
msg[gr::message::key::Data] = m_parameters;
msg[gr::message::key::Data] = gr::property_map{ { name, p } };
App::instance().sendMessage(msg);
}

void Block::updateSettings(gr::property_map &&settings) {
m_parameters = std::move(settings);
for (const auto &[k, v] : settings) {
m_parameters[k] = v;
}
}

void Block::update() {
Expand Down
2 changes: 1 addition & 1 deletion src/ui/flowgraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ struct DataType {

const std::string &toString() const;

inline operator Id() const { return m_id; }
inline operator Id() const { return m_id; }

private:
Id m_id = Id::Untyped;
Expand Down

0 comments on commit ee2aa3c

Please sign in to comment.