Skip to content

Commit

Permalink
refactor scheduler thread
Browse files Browse the repository at this point in the history
  • Loading branch information
frankosterfeld committed Oct 22, 2023
1 parent 6cf5e1e commit 5cac48e
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions src/service/gnuradio/GnuRadioWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ struct PollerKey {

template<units::basic_fixed_string serviceName, typename... Meta>
class GnuRadioWorker : public Worker<serviceName, TimeDomainContext, Empty, Acquisition, Meta...> {
gr::plugin_loader *_plugin_loader;
std::jthread _notifyThread;
std::jthread _schedulingThread;
gr::plugin_loader *_plugin_loader;
std::jthread _notifyThread;
std::optional<gr::Graph> _flow_graph;
std::mutex _flow_graph_mutex;
std::chrono::milliseconds _rate;
std::string _graphGrc;

Expand All @@ -53,28 +54,43 @@ class GnuRadioWorker : public Worker<serviceName, TimeDomainContext, Empty, Acqu
// this makes sure the subscriptions are filtered correctly
opencmw::query::registerTypes(TimeDomainContext(), broker);

_schedulingThread = std::jthread([this] {
try {
auto flowGraph = gr::load_grc(*_plugin_loader, _graphGrc);
fmt::println("Loaded flow graph: BEGIN GRC\n{}\nEND GRC", gr::save_grc(flowGraph));
gr::scheduler::Simple<> sched(std::move(flowGraph));
sched.runAndWait();
} catch (const std::string &e) {
fmt::println("Could not load flow graph: {}", e);
try {
if (!_graphGrc.empty()) {
_flow_graph = gr::load_grc(*_plugin_loader, _graphGrc);
}
});
} catch (const std::string &e) {
throw std::runtime_error(fmt::format("Could not load flow graph: {}", e));
}

// TODO instead of a notify thread with polling, we could also use callbacks. This would require
// the ability to unregister callbacks though (RAII callback "handles" using shared_ptr/weak_ptr like it works for pollers??)
_notifyThread = std::jthread([this, &rate](const std::stop_token &stoken) {
fmt::print("GnuRadioWorker: starting notify thread\n");
std::chrono::time_point update = std::chrono::system_clock::now();

std::chrono::time_point update = std::chrono::system_clock::now();
std::jthread schedulerThread;
// TODO: current load_grc creates Foo<double> types no matter what the original type was
// when supporting more types, we need some type erasure here
std::map<PollerKey, std::shared_ptr<basic::DataSink<double>::Poller>> streamingPollers;
while (!stoken.stop_requested()) {
{
std::lock_guard lg{ _flow_graph_mutex };
if (_flow_graph) {
if (schedulerThread.joinable()) {
schedulerThread.request_stop();
schedulerThread.join();
}
auto runGraph = [](gr::Graph fg) {
// TODO check stop_requested and abort scheduler (use start()/stop() and multithreading)
gr::scheduler::Simple<> sched(std::move(fg));
sched.runAndWait();
};
auto fg = std::exchange(_flow_graph, {});
schedulerThread = std::jthread(runGraph, std::move(*fg));
}
}
for (auto subTopic : super_t::activeSubscriptions()) { // loop over active subscriptions
// TODO drop pollers for removed subscriptions (to not block in data sink)
if (subTopic.path() != serviceName.c_str()) {
continue;
}
Expand Down Expand Up @@ -134,6 +150,9 @@ class GnuRadioWorker : public Worker<serviceName, TimeDomainContext, Empty, Acqu
}
update = next_update;
}

schedulerThread.request_stop();
schedulerThread.join();
});

super_t::setCallback([this](RequestContext &rawCtx, const TimeDomainContext &requestContext, const Empty &, TimeDomainContext & /*replyContext*/, Acquisition &out) {
Expand All @@ -154,7 +173,6 @@ class GnuRadioWorker : public Worker<serviceName, TimeDomainContext, Empty, Acqu
~GnuRadioWorker() {
_notifyThread.request_stop();
_notifyThread.join();
_schedulingThread.join();
}
};
} // namespace opendigitizer::acq
Expand Down

0 comments on commit 5cac48e

Please sign in to comment.