From 5cac48e3b0e5576e91fa2f2ce64c60024a0d265b Mon Sep 17 00:00:00 2001 From: Frank Osterfeld Date: Mon, 23 Oct 2023 00:16:22 +0200 Subject: [PATCH] refactor scheduler thread --- src/service/gnuradio/GnuRadioWorker.hpp | 46 +++++++++++++++++-------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/src/service/gnuradio/GnuRadioWorker.hpp b/src/service/gnuradio/GnuRadioWorker.hpp index c247e0ce..95bcbb92 100644 --- a/src/service/gnuradio/GnuRadioWorker.hpp +++ b/src/service/gnuradio/GnuRadioWorker.hpp @@ -39,9 +39,10 @@ struct PollerKey { template class GnuRadioWorker : public Worker { - gr::plugin_loader *_plugin_loader; - std::jthread _notifyThread; - std::jthread _schedulingThread; + gr::plugin_loader *_plugin_loader; + std::jthread _notifyThread; + std::optional _flow_graph; + std::mutex _flow_graph_mutex; std::chrono::milliseconds _rate; std::string _graphGrc; @@ -53,28 +54,43 @@ class GnuRadioWorker : public Worker sched(std::move(flowGraph)); - sched.runAndWait(); - } catch (const std::string &e) { - fmt::println("Could not load flow graph: {}", e); + try { + if (!_graphGrc.empty()) { + _flow_graph = gr::load_grc(*_plugin_loader, _graphGrc); } - }); + } catch (const std::string &e) { + throw std::runtime_error(fmt::format("Could not load flow graph: {}", e)); + } // TODO instead of a notify thread with polling, we could also use callbacks. This would require // the ability to unregister callbacks though (RAII callback "handles" using shared_ptr/weak_ptr like it works for pollers??) _notifyThread = std::jthread([this, &rate](const std::stop_token &stoken) { fmt::print("GnuRadioWorker: starting notify thread\n"); - std::chrono::time_point update = std::chrono::system_clock::now(); + std::chrono::time_point update = std::chrono::system_clock::now(); + std::jthread schedulerThread; // TODO: current load_grc creates Foo types no matter what the original type was // when supporting more types, we need some type erasure here std::map::Poller>> streamingPollers; while (!stoken.stop_requested()) { + { + std::lock_guard lg{ _flow_graph_mutex }; + if (_flow_graph) { + if (schedulerThread.joinable()) { + schedulerThread.request_stop(); + schedulerThread.join(); + } + auto runGraph = [](gr::Graph fg) { + // TODO check stop_requested and abort scheduler (use start()/stop() and multithreading) + gr::scheduler::Simple<> sched(std::move(fg)); + sched.runAndWait(); + }; + auto fg = std::exchange(_flow_graph, {}); + schedulerThread = std::jthread(runGraph, std::move(*fg)); + } + } for (auto subTopic : super_t::activeSubscriptions()) { // loop over active subscriptions + // TODO drop pollers for removed subscriptions (to not block in data sink) if (subTopic.path() != serviceName.c_str()) { continue; } @@ -134,6 +150,9 @@ class GnuRadioWorker : public Worker