Skip to content

Commit

Permalink
Implement all the modes, add demo application
Browse files Browse the repository at this point in the history
  • Loading branch information
frankosterfeld committed Oct 27, 2023
1 parent 5cac48e commit b0bcf2d
Show file tree
Hide file tree
Showing 9 changed files with 675 additions and 217 deletions.
2 changes: 1 addition & 1 deletion src/service/cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ include(FetchContent)

FetchContent_Declare(
opencmw-cpp
GIT_REPOSITORY git@github.com:frankosterfeld/opencmw-cpp.git
GIT_REPOSITORY https://github.com/fair-acc/opencmw-cpp.git
GIT_TAG frank/opendigitizer-service
)

Expand Down
331 changes: 262 additions & 69 deletions src/service/gnuradio/GnuRadioWorker.hpp

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions src/service/gnuradio/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
cmrc_add_resource_library(GNURADIO_WORKER_TEST_ASSETS ALIAS assets::grc NAMESPACE gnuradioWorkerTestFilesystem simple-counter.grc)

add_executable(GnuRadioWorkerTest GnuRadioWorker_test.cpp)
target_link_libraries(GnuRadioWorkerTest PRIVATE fmt ut od_gnuradio_worker client zmq assets::grc)
target_link_libraries(GnuRadioWorkerTest PRIVATE fmt ut od_gnuradio_worker client zmq)

add_executable(GnuRadioWorkerDemo GnuRadioWorker_demo.cpp)
target_link_libraries(GnuRadioWorkerDemo PRIVATE fmt od_gnuradio_worker client zmq)

if(NOT EMSCRIPTEN)
target_link_libraries(GnuRadioWorkerDemo PRIVATE fair-picoscope)
endif()
100 changes: 100 additions & 0 deletions src/service/gnuradio/test/CountSource.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#ifndef COUNTSOURCE_HPP
#define COUNTSOURCE_HPP

#include <deque>

template<typename T>
struct CountSource : public gr::Block<CountSource<T>> {
using clock = std::chrono::system_clock;
gr::PortOut<T> out;

std::size_t n_samples = 0;
std::string signal_name = "test signal";
std::string signal_unit = "test unit";
double sample_rate = 0.;
std::string direction = "up";
std::vector<std::string> timing_tags;
std::size_t _produced = 0;
std::deque<gr::Tag> _pending_tags;
std::optional<clock::time_point> _first_request;
bool _waiting = true;

void
settingsChanged(const gr::property_map & /*old_settings*/, const gr::property_map & /*new_settings*/) {
_produced = 0;
_pending_tags.clear();

gr::property_map channelInfo = gr::property_map{ { std::string(gr::tag::SAMPLE_RATE.key()), static_cast<float>(sample_rate) } };
if (!signal_name.empty())
channelInfo.emplace(std::string(gr::tag::SIGNAL_NAME.key()), signal_name);
if (!signal_unit.empty())
channelInfo.emplace(std::string(gr::tag::SIGNAL_UNIT.key()), signal_unit);
_pending_tags.emplace_back(0, channelInfo);

for (const auto &tagStr : timing_tags) {
auto view = tagStr | std::ranges::views::split(',');
const auto segs = std::vector(view.begin(), view.end());
if (segs.size() != 2) {
fmt::println(std::cerr, "Invalid tag: ''", tagStr);
continue;
}
const auto indexStr = std::string_view(segs[0].begin(), segs[0].end());
gr::Tag::signed_index_type index = 0;
if (const auto &[_, ec] = std::from_chars(indexStr.begin(), indexStr.end(), index); ec != std::errc{}) {
fmt::println("Invalid tag index '{}'", segs[0]);
continue;
}
_pending_tags.emplace_back(index, gr::property_map{ { std::string{ gr::tag::TRIGGER_NAME.key() }, std::string{ segs[1].begin(), segs[1].end() } } });
}
}

gr::work::Status
processBulk(gr::PublishableSpan auto &output) noexcept {
// From the first processBulk() call, wait some time to give the test clients time to subscribe
using enum gr::work::Status;
if (!_first_request) {
_first_request = clock::now();
output.publish(0);
return OK;
}
if (_waiting) {
const auto now = clock::now();
if (now - *_first_request < std::chrono::milliseconds(500)) {
output.publish(0);
return OK;
}
_waiting = false;
}
auto n = std::min(output.size(), n_samples - _produced);
// chunk data so that there's one tag max, at index 0 in the chunk
auto tagIt = _pending_tags.begin();
if (tagIt != _pending_tags.end()) {
if (static_cast<std::size_t>(tagIt->index) == _produced) {
tagIt++;
}
if (tagIt != _pending_tags.end()) {
n = std::min(n, static_cast<std::size_t>(tagIt->index) - _produced);
}
}
if (!_pending_tags.empty() && _pending_tags[0].index == static_cast<gr::Tag::signed_index_type>(_produced)) {
this->output_tags()[0] = { 0, _pending_tags[0].map };
_pending_tags.pop_front();
this->forward_tags();
}

const auto subspan = std::span(output.begin(), output.end()).first(n);
if (direction == "up") {
std::iota(subspan.begin(), subspan.end(), static_cast<T>(_produced));
} else {
std::iota(subspan.begin(), subspan.end(), static_cast<T>(n_samples - _produced - n));
std::reverse(subspan.begin(), subspan.end());
}
output.publish(n);
_produced += n;
return _produced == n_samples ? DONE : OK;
}
};

ENABLE_REFLECTION_FOR_TEMPLATE(CountSource, out, n_samples, signal_name, signal_unit, sample_rate, direction, timing_tags);

#endif
69 changes: 69 additions & 0 deletions src/service/gnuradio/test/GnuRadioWorker_demo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <Client.hpp>
#include <majordomo/Broker.hpp>
#include <majordomo/Settings.hpp>
#include <majordomo/Worker.hpp>
#include <zmq/ZmqUtils.hpp>

#include <fmt/format.h>

#include <GnuRadioWorker.hpp>

#include <gnuradio-4.0/basic/common_blocks.hpp>
#include <gnuradio-4.0/basic/function_generator.h>
#include <gnuradio-4.0/basic/selector.hpp>

#include "CountSource.hpp"

#ifndef __EMSCRIPTEN__
#include <Picoscope4000a.hpp>
#endif

template<typename Registry>
void registerTestBlocks(Registry *registry) {
registerBuiltinBlocks(registry);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
GP_REGISTER_NODE_RUNTIME(registry, gr::basic::DataSink, double, float, int16_t);
GP_REGISTER_NODE_RUNTIME(registry, CountSource, double, float, int16_t);
#ifndef __EMSCRIPTEN__
GP_REGISTER_NODE_RUNTIME(registry, fair::picoscope::Picoscope4000a, double, float, int16_t);
#endif
#pragma GCC diagnostic pop
}

int main(int argc, char **argv) {
using namespace opencmw;
using namespace opendigitizer::acq;
using namespace std::chrono_literals;
if (argc < 3) {
std::cerr << "Usage: GnuRadioWorkerDemo <brokerRouterAddress> <grcFile>" << std::endl;
return 1;
}
const auto brokerAddress = URI<>(std::string(argv[1]));

std::ifstream in(argv[2]);
std::stringstream grcBuffer;
if (!(grcBuffer << in.rdbuf())) {
fmt::println(std::cerr, "Could not read GRC file: {}", strerror(errno));
return 1;
}

gr::BlockRegistry registry;
registerTestBlocks(&registry);
gr::plugin_loader pluginLoader(&registry, {});

majordomo::Broker broker("PrimaryBroker");
const auto boundAddress = broker.bind(brokerAddress);
if (!boundAddress) {
fmt::println(std::cerr, "Could not bind broker to address '{}'", argv[1]);
return 1;
}
fmt::println("Broker listens to {}", boundAddress->str());

using GrWorker = GnuRadioWorker<"/Hello/GnuRadio", description<"Provides data from a GnuRadio flow graph execution">>;
GrWorker grWorker(broker, &pluginLoader, grcBuffer.view(), 10ms);
std::jthread brokerThread([&broker] { broker.run(); });
std::jthread grWorkerThread([&grWorker] { grWorker.run(); });
brokerThread.join();
grWorkerThread.join();
}
Loading

0 comments on commit b0bcf2d

Please sign in to comment.