Skip to content

Commit

Permalink
WIP Make remote sources a regular GR block
Browse files Browse the repository at this point in the history
Remove special treatment for remote sources, make it a regular GR
block.

TODO:
 - readd remote flowgraph handling
 - readd adding new sources from the UI (wait for messages?)
  • Loading branch information
frankosterfeld committed Jan 31, 2024
1 parent 1bb53be commit ee22108
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 273 deletions.
8 changes: 4 additions & 4 deletions src/service/dashboard/defaultDashboard.flowgraph
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ blocks:
- name: source for sink 2
id: sink_source
- name: remote source 1
id: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
id: opendigitizer::RemoteSource
parameters:
remote_uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
signal_name: test
- name: sink 1
id: sink
- name: sink 2
Expand All @@ -29,6 +32,3 @@ connections:
- [sum sigs, 0, sink 2, 0]
- [sine source 1, 0, sink 3, 0]
- [remote source 1, 0, sink 4, 0]
remote_sources:
- uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
signal_name: test
1 change: 0 additions & 1 deletion src/ui/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ set(sources
dashboard.cpp
flowgraph/datasource.cpp
flowgraph/datasink.cpp
flowgraph/remotedatasource.cpp
flowgraph/arithmetic_block.cpp
dashboardpage.cpp
opendashboardpage.cpp
Expand Down
101 changes: 101 additions & 0 deletions src/ui/RemoteSource.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#ifndef OPENDIGITIZER_REMOTESOURCE_HPP
#define OPENDIGITIZER_REMOTESOURCE_HPP

#include <gnuradio-4.0/Block.hpp>

#include <daq_api.hpp>

#include <IoSerialiserYaS.hpp>
#include <MdpMessage.hpp>
#include <opencmw.hpp>
#include <RestClient.hpp>
#include <type_traits>

namespace opendigitizer {

template<typename T>
requires std::is_same_v<T, float>
struct RemoteSource : public gr::Block<RemoteSource<T>> {
gr::PortOut<float> out;
std::string remote_uri;
std::string signal_name;
opencmw::client::RestClient m_client;

struct Data {
opendigitizer::acq::Acquisition data;
std::size_t read = 0;
};

std::deque<Data> m_data;
std::mutex m_mutex;

void append(opendigitizer::acq::Acquisition &&data) {
std::lock_guard lock(m_mutex);
m_data.push_back({ std::move(data), 0 });
}

auto processBulk(gr::PublishableSpan auto &output) noexcept {
std::size_t written = 0;
std::lock_guard lock(m_mutex);
while (written < output.size() && !m_data.empty()) {
auto &d = m_data.front();
auto in = std::span<const float>(d.data.channelValue.begin(), d.data.channelValue.end());
in = in.subspan(d.read, std::min(output.size() - written, in.size() - d.read));

std::copy(in.begin(), in.end(), output.begin() + written);
written += in.size();
d.read += in.size();
if (d.read == d.data.channelValue.size()) {
m_data.pop_front();
}
}
output.publish(written);
return gr::work::Status::OK;
}

void
settingsChanged(const gr::property_map &old_settings, const gr::property_map & /*new_settings*/) {
const auto oldValue = old_settings.find("remote_uri");
if (oldValue != old_settings.end()) {
const auto oldUri = std::get<std::string>(oldValue->second);
if (!oldUri.empty()) {
fmt::print("Unsubscribing from {}\n", oldUri);
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Unsubscribe;
command.topic = opencmw::URI<>(remote_uri);
command.callback = [oldUri](const opencmw::mdp::Message &) {
// TODO: Add cleanup once openCMW starts calling the callback
// on successful unsubscribe
fmt::print("Unsubscribed from {} successfully\n", oldUri);
};
}
}

opencmw::client::Command command;
command.command = opencmw::mdp::Command::Subscribe;
command.topic = opencmw::URI<>(remote_uri);
fmt::print("Subscribing to {}\n", remote_uri);

command.callback = [this](const opencmw::mdp::Message &rep) {
if (rep.data.empty()) {
return;
}
try {
auto buf = rep.data;
opendigitizer::acq::Acquisition acq;
opencmw::deserialise<opencmw::YaS, opencmw::ProtocolCheck::IGNORE>(buf, acq);
append(std::move(acq));
} catch (opencmw::ProtocolException &e) {
fmt::print(std::cerr, "{}\n", e.what());
return;
}
};
m_client.request(command);
}
};

} // namespace opendigitizer

ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::RemoteSource, out, remote_uri, signal_name)

#endif
39 changes: 4 additions & 35 deletions src/ui/flowgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include <fmt/format.h>

#include "flowgraph/remotedatasource.h"
#include "yamlutils.h"
#include <yaml-cpp/yaml.h>

Expand Down Expand Up @@ -285,17 +284,7 @@ void FlowGraph::parse(const std::filesystem::path &file) {
void FlowGraph::parse(const std::string &str) {
clear();

YAML::Node tree = YAML::Load(str);

auto rsources = tree["remote_sources"];
if (rsources && rsources.IsSequence()) {
for (const auto &s : rsources) {
auto uri = s["uri"].as<std::string>();
auto signalName = s["signal_name"].as<std::string>();

RemoteDataSource::registerBlockType(this, uri, signalName);
}
}
YAML::Node tree = YAML::Load(str);

auto blocks = tree["blocks"];
for (const auto &b : blocks) {
Expand All @@ -306,8 +295,6 @@ void FlowGraph::parse(const std::string &str) {

auto type = BlockType::registry().get(id);
if (!type) {
std::cerr << "Block type '" << id << "' is unkown.\n";

auto block = std::make_unique<GRBlock>(n, id, type);
m_blocks.push_back(std::move(block));
continue;
Expand Down Expand Up @@ -379,7 +366,9 @@ void FlowGraph::clear() {
m_sourceBlocks.clear();
m_sinkBlocks.clear();
m_connections.clear();
#if 0
m_remoteSources.clear();
#endif
}

int FlowGraph::save(std::ostream &stream) {
Expand Down Expand Up @@ -427,21 +416,6 @@ int FlowGraph::save(std::ostream &stream) {
}
});
}

if (!m_remoteSources.empty()) {
root.write("remote_sources", [&]() {
YamlSeq sources(out);
for (const auto &s : m_remoteSources) {
YamlMap map(out);
map.write("uri", s.uri);
// we need to save down the name of the signal (and in the future probably other stuff) because when we load
// having to wait for information from the servers about all the remote signals used by the flowgraph would
// not be ideal. Moreover, this way a flowgraph can be loaded even when some signal isn't available at the
// moment. There will be no data but the flowgraph will load correctly anyway.
map.write("signal_name", s.type->outputs[0].name);
}
});
}
}

stream << out.c_str();
Expand Down Expand Up @@ -568,12 +542,7 @@ void FlowGraph::disconnect(Connection *c) {
}

void FlowGraph::addRemoteSource(std::string_view uri) {
RemoteDataSource::registerBlockType(this, uri);
}

void FlowGraph::registerRemoteSource(std::unique_ptr<BlockType> &&type, std::string_view uri) {
m_remoteSources.push_back({ type.get(), std::string(uri) });
BlockType::registry().addBlockType(std::move(type));
// TODO create block, etc.
}

gr::Graph FlowGraph::createGraph() {
Expand Down
1 change: 0 additions & 1 deletion src/ui/flowgraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ class FlowGraph {

int save(std::ostream &stream);
void addRemoteSource(std::string_view uri);
void registerRemoteSource(std::unique_ptr<BlockType> &&type, std::string_view uri);

std::function<void(Block *)> sourceBlockAddedCallback;
std::function<void(Block *)> sinkBlockAddedCallback;
Expand Down
Loading

0 comments on commit ee22108

Please sign in to comment.