Skip to content

Commit

Permalink
Service: Register GR signals at the DNS
Browse files Browse the repository at this point in the history
Ensure that the signal properties are read from the signal_name,
signal_unit etc. properties configured at the source (like PicoScope)
and registered at the DNS. The DNS entries dynamically update as the
signal properties in the flow graph's data sinks change.
  • Loading branch information
frankosterfeld committed Mar 8, 2024
1 parent 7f581cf commit e4b76ef
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 37 deletions.
2 changes: 1 addition & 1 deletion cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include(FetchContent)
FetchContent_Declare(
opencmw-cpp
GIT_REPOSITORY https://github.com/fair-acc/opencmw-cpp.git
GIT_TAG ac69eaf402259a6cf74865c583a632703f7cfcff # main as of 2024-02-27
GIT_TAG 48e9305d1a72dcb406ff8a6e7ae0b791b0d0802a # main as of 2024-02-29
)

FetchContent_Declare(
Expand Down
108 changes: 100 additions & 8 deletions src/service/gnuradio/GnuRadioWorker.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef OPENDIGITIZER_SERVICE_GNURADIOWORKER_H
#define OPENDIGITIZER_SERVICE_GNURADIOWORKER_H

#include "gnuradio-4.0/Message.hpp"
#include <daq_api.hpp>

#include <majordomo/Worker.hpp>
Expand All @@ -11,7 +12,6 @@
#include <gnuradio-4.0/Scheduler.hpp>

#include <chrono>
#include <cmath>
#include <memory>
#include <ranges>
#include <string_view>
Expand Down Expand Up @@ -48,6 +48,20 @@ inline std::string findTriggerName(std::span<const gr::Tag> tags) {
return {};
}

template<typename T>
inline std::optional<T> getSetting(const gr::BlockModel &block, const std::string &key) {
try {
const auto setting = block.settings().get(key);
if (!setting) {
return {};
}
return std::get<T>(*setting);
} catch (const std::exception &e) {
fmt::println(std::cerr, "Unexpected type for '{}' property", key);
return {};
}
}

} // namespace detail

using namespace gr;
Expand Down Expand Up @@ -113,6 +127,14 @@ struct StreamingPollerEntry {
}
};

struct SignalEntry {
std::string name;
std::string unit;
float sample_rate;

auto operator<=>(const SignalEntry &) const noexcept = default;
};

struct DataSetPollerEntry {
using SampleType = double;
std::shared_ptr<gr::basic::DataSink<SampleType>::DataSetPoller> poller;
Expand All @@ -121,10 +143,11 @@ struct DataSetPollerEntry {

template<units::basic_fixed_string serviceName, typename... Meta>
class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext, Empty, Acquisition, Meta...> {
gr::PluginLoader *_plugin_loader;
std::jthread _notifyThread;
std::unique_ptr<gr::Graph> _pending_flow_graph;
std::mutex _flow_graph_mutex;
gr::PluginLoader *_plugin_loader;
std::jthread _notifyThread;
std::unique_ptr<gr::Graph> _pending_flow_graph;
std::mutex _flow_graph_mutex;
std::function<void(std::vector<SignalEntry>)> _updateSignalEntriesCallback;

public:
using super_t = Worker<serviceName, TimeDomainContext, Empty, Acquisition, Meta...>;
Expand Down Expand Up @@ -153,6 +176,10 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
_pending_flow_graph = std::move(fg);
}

void setUpdateSignalEntriesCallback(std::function<void(std::vector<SignalEntry>)> callback) {
_updateSignalEntriesCallback = std::move(callback);
}

private:
void init(std::chrono::milliseconds rate) {
// TODO instead of a notify thread with polling, we could also use callbacks. This would require
Expand All @@ -164,8 +191,10 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
std::map<PollerKey, StreamingPollerEntry> streamingPollers;
std::map<PollerKey, DataSetPollerEntry> dataSetPollers;
std::jthread schedulerThread;
std::map<std::string, SignalEntry> signalEntryBySink;
std::unique_ptr<MsgPortOut> toScheduler;
std::unique_ptr<MsgPortIn> fromScheduler;

bool finished = false;

while (!finished) {
Expand All @@ -179,21 +208,64 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
auto shutdownMessage = toScheduler->streamWriter().reserve(1);
shutdownMessage[0] = {
{ key::Kind, kind::SchedulerStateChangeRequest },
{ key::What, std::string(magic_enum::enum_name(lifecycle::State::REQUESTED_STOP)) }
{ key::What,
std::string(magic_enum::enum_name(lifecycle::State::REQUESTED_STOP)) }
};
shutdownMessage.publish(1);
}

if (hasScheduler) {
auto messages = fromScheduler->streamReader().get(fromScheduler->streamReader().available());
bool signalInfoChanged = false;
auto messages = fromScheduler->streamReader().get(fromScheduler->streamReader().available());
for (const auto &message : messages) {
const auto updateValue = messageField<std::string>(message, kind::SchedulerStateUpdate);
if (updateValue == magic_enum::enum_name(lifecycle::State::STOPPED)) {
schedulerFinished = true;
continue;
}

const auto kind = messageField<std::string>(message, key::Kind);

if (kind == kind::SettingsChanged) {
const auto &settings = messageField<gr::property_map>(message, key::Data);
const auto &sender = messageField<std::string>(message, key::Sender);
if (!settings || !sender) {
continue;
}
auto sinkIt = signalEntryBySink.find(*sender);
if (sinkIt == signalEntryBySink.end()) {
continue;
}
auto &entry = sinkIt->second;

const auto signal_name = detail::get<std::string>(*settings, "signal_name");
const auto signal_unit = detail::get<std::string>(*settings, "signal_unit");
const auto sample_rate = detail::get<float>(*settings, "sample_rate");
if (signal_name && signal_name != entry.name) {
entry.name = *signal_name;
signalInfoChanged = true;
}
if (signal_unit && signal_unit != entry.unit) {
entry.unit = *signal_unit;
signalInfoChanged = true;
}
if (sample_rate && sample_rate != entry.sample_rate) {
entry.sample_rate = *sample_rate;
signalInfoChanged = true;
}
}
}

std::ignore = messages.consume(messages.size());
std::ignore = messages.consume(messages.size());

if (signalInfoChanged && _updateSignalEntriesCallback) {
std::vector<SignalEntry> entries;
entries.reserve(signalEntryBySink.size());
for (const auto &[_, entry] : signalEntryBySink) {
entries.push_back(entry);
}
_updateSignalEntriesCallback(std::move(entries));
}

bool pollersFinished = true;
do {
Expand All @@ -212,6 +284,10 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
}

if (stopScheduler || schedulerFinished) {
if (_updateSignalEntriesCallback) {
_updateSignalEntriesCallback({});
}
signalEntryBySink.clear();
streamingPollers.clear();
dataSetPollers.clear();
fromScheduler.reset();
Expand All @@ -225,6 +301,22 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
}

if (pendingFlowGraph) {
pendingFlowGraph->forEachBlock([&signalEntryBySink](const auto &block) {
if (block.typeName().starts_with("gr::basic::DataSink")) {
auto &entry = signalEntryBySink[std::string(block.uniqueName())];
entry.name = detail::getSetting<std::string>(block, "signal_name").value_or("");
entry.unit = detail::getSetting<std::string>(block, "signal_unit").value_or("");
entry.sample_rate = detail::getSetting<float>(block, "sample_rate").value_or(1.f);
}
});
if (_updateSignalEntriesCallback) {
std::vector<SignalEntry> entries;
entries.reserve(signalEntryBySink.size());
for (const auto &[_, entry] : signalEntryBySink) {
entries.push_back(entry);
}
_updateSignalEntriesCallback(std::move(entries));
}
auto sched = std::make_unique<scheduler::Simple<scheduler::multiThreaded>>(std::move(*pendingFlowGraph));
toScheduler = std::make_unique<MsgPortOut>();
fromScheduler = std::make_unique<MsgPortIn>();
Expand Down
78 changes: 69 additions & 9 deletions src/service/gnuradio/test/qa_GnuRadioWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ struct TestSetup {
zmq::Context ctx;
client::ClientContext client = makeClient(ctx);

TestSetup() {
explicit TestSetup(std::function<void(std::vector<SignalEntry>)> dnsCallback = {}) {
const auto brokerPubAddress = broker.bind(URI<>("mds://127.0.0.1:12345"));
expect((brokerPubAddress.has_value() == "bound successful"_b));
const auto brokerRouterAddress = broker.bind(URI<>("mdp://127.0.0.1:12346"));
expect((brokerRouterAddress.has_value() == "bound successful"_b));
acqWorker.setUpdateSignalEntriesCallback(std::move(dnsCallback));

brokerThread = std::jthread([this] { broker.run(); });
acqWorkerThread = std::jthread([this] { acqWorker.run(); });
fgWorkerThread = std::jthread([this] { fgWorker.run(); });
Expand Down Expand Up @@ -181,7 +183,12 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
- [count_down, 0, delay_down, 0]
- [delay_down, 0, test_sink_down, 0]
)";
TestSetup test;
std::vector<SignalEntry> lastDnsEntries;
TestSetup test([&lastDnsEntries](auto entries) {
if (!entries.empty()) {
lastDnsEntries = std::move(entries);
}
});

constexpr std::size_t kExpectedSamples = 100;
const std::vector<float> expectedUpData = getIota(kExpectedSamples);
Expand Down Expand Up @@ -219,6 +226,12 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
expect(eq(receivedUpData, expectedUpData));
expect(eq(receivedDownData.size(), kExpectedSamples));
expect(eq(receivedDownData, expectedDownData));
expect(eq(lastDnsEntries.size(), 2UZ));
std::ranges::sort(lastDnsEntries, {}, &SignalEntry::name);
expect(eq(lastDnsEntries[0].name, "count_down"sv));
expect(eq(lastDnsEntries[0].unit, "down unit"sv));
expect(eq(lastDnsEntries[1].name, "count_up"sv));
expect(eq(lastDnsEntries[1].unit, "up unit"sv));
};

"Flow graph management"_test = [] {
Expand Down Expand Up @@ -321,7 +334,14 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
- [source, 0, test_sink, 0]
)";

TestSetup test;
std::mutex dnsMutex;
std::vector<SignalEntry> lastDnsEntries;
TestSetup test([&lastDnsEntries, &dnsMutex](auto entries) {
if (!entries.empty()) {
std::lock_guard lock(dnsMutex);
lastDnsEntries = std::move(entries);
}
});

std::atomic<std::size_t> receivedCount1 = 0;
test.subscribeClient(URI("mds://127.0.0.1:12345/GnuRadio/Acquisition?channelNameFilter=test1"), [&receivedCount1](const auto &acq) {
Expand All @@ -335,12 +355,21 @@ const boost::ut::suite GnuRadioWorker_tests = [] {

std::this_thread::sleep_for(50ms);
test.setGrc(grc1);

std::this_thread::sleep_for(1200ms);

{
std::lock_guard lock(dnsMutex);
expect(eq(lastDnsEntries.size(), 1UZ));
expect(eq(lastDnsEntries[0].name, "test1"sv));
}
test.setGrc(grc2);

constexpr auto kExpectedSamples = 50000UZ;
waitWhile([&] { return receivedCount1 < kExpectedSamples || receivedCount2 < kExpectedSamples; });

std::lock_guard lock(dnsMutex);
expect(eq(lastDnsEntries.size(), 1UZ));
expect(eq(lastDnsEntries[0].name, "test2"sv));
};

"Trigger - tightly packed tags"_test = [] {
Expand Down Expand Up @@ -435,6 +464,7 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
id: CountSource
parameters:
n_samples: 100
signal_unit: A unit
sample_rate: 10
timing_tags:
- 30,hello
Expand All @@ -452,7 +482,12 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
- [count, 0, delay, 0]
- [delay, 0, test_sink, 0]
)";
TestSetup test;
std::vector<SignalEntry> lastDnsEntries;
TestSetup test([&lastDnsEntries](auto entries) {
if (!entries.empty()) {
lastDnsEntries = std::move(entries);
}
});

std::vector<float> receivedData;
std::atomic<std::size_t> receivedCount = 0;
Expand All @@ -469,6 +504,10 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
waitWhile([&] { return receivedCount < 20; });

expect(eq(receivedData, getIota(20, 50)));
expect(eq(lastDnsEntries.size(), 1UZ));
expect(eq(lastDnsEntries[0].name, "count"sv));
expect(eq(lastDnsEntries[0].unit, "A unit"sv));
expect(eq(lastDnsEntries[0].sample_rate, 10.f));
};

"Snapshot"_test = [] {
Expand Down Expand Up @@ -498,7 +537,13 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
- [count, 0, delay, 0]
- [delay, 0, test_sink, 0]
)";
TestSetup test;
std::vector<SignalEntry> lastDnsEntries;

TestSetup test([&lastDnsEntries](auto entries) {
if (!entries.empty()) {
lastDnsEntries = std::move(entries);
}
});

std::vector<float> receivedData;
std::atomic<std::size_t> receivedCount = 0;
Expand All @@ -519,6 +564,10 @@ const boost::ut::suite GnuRadioWorker_tests = [] {

// trigger + delay * sample_rate = 50 + 3 * 10 = 80
expect(eq(receivedData, std::vector{ 80.f }));
expect(eq(lastDnsEntries.size(), 1UZ));
expect(eq(lastDnsEntries[0].name, "count"sv));
expect(eq(lastDnsEntries[0].unit, "A unit"sv));
expect(eq(lastDnsEntries[0].sample_rate, 10.f));
};

"Flow graph handling - Unknown block"_test = [] {
Expand Down Expand Up @@ -582,11 +631,16 @@ const boost::ut::suite GnuRadioWorker_tests = [] {

// Here we rely on the signal_name propagation from the sources to the sinks. As that only happens at execution time, there's a delay between
// the flowgraph execution starting and the listener registration succeeding, thus we don't get all the signal data from the start.
std::vector<float> receivedUpData;
std::vector<float> receivedDownData;
std::vector<float> receivedUpData;
std::vector<float> receivedDownData;
std::vector<SignalEntry> lastDnsEntries;

{
TestSetup test;
TestSetup test([&lastDnsEntries](auto entries) {
if (!entries.empty()) {
lastDnsEntries = std::move(entries);
}
});

test.subscribeClient(URI("mds://127.0.0.1:12345/GnuRadio/Acquisition?channelNameFilter=count_up"), [&receivedUpData](const Acquisition &acq) {
expect(eq(acq.channelName.value(), "count_up"sv));
Expand All @@ -608,6 +662,12 @@ const boost::ut::suite GnuRadioWorker_tests = [] {
}
expect(!receivedUpData.empty());
expect(!receivedDownData.empty());
std::ranges::sort(lastDnsEntries, {}, &SignalEntry::name);
expect(eq(lastDnsEntries.size(), 2UZ));
expect(eq(lastDnsEntries[0].name, "count_down"sv));
expect(eq(lastDnsEntries[0].unit, "Test unit B"sv));
expect(eq(lastDnsEntries[1].name, "count_up"sv));
expect(eq(lastDnsEntries[1].unit, "Test unit A"sv));
};
};

Expand Down
Loading

0 comments on commit e4b76ef

Please sign in to comment.