Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add (client only) support for rda3 #348

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
12 changes: 6 additions & 6 deletions .github/workflows/build_cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ jobs:
matrix:
configurations:
- name: Ubuntu Latest gcc13
os: ubuntu-22.04
os: ubuntu-24.04
compiler: gcc13
- name: Ubuntu Latest clang17
os: ubuntu-22.04
os: ubuntu-24.04
compiler: clang17
- name: ubuntu-22.04 emscripten
os: ubuntu-22.04
- name: ubuntu-24.04 emscripten
os: ubuntu-24.04
compiler: emscripten
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
cmake-build-type: [ Release, Debug ]
env:
BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 100

- name: Cache
uses: actions/cache@v3
uses: actions/cache@v4
env:
cache-name: cache-fetchContent-cache
with:
Expand Down
882 changes: 882 additions & 0 deletions src/client/include/CmwLightClient.hpp

Large diffs are not rendered by default.

171 changes: 171 additions & 0 deletions src/client/include/DirectoryLightClient.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#ifndef OPENCMW_CPP_DIRECTORYLIGHTCLIENT_HPP
#define OPENCMW_CPP_DIRECTORYLIGHTCLIENT_HPP

#include <map>
#include <ranges>
#include <string_view>
#include <zmq/ZmqUtils.hpp>

using namespace std::chrono_literals;

auto parse = [](const std::string &reply) {
auto urlDecode = [](std::string str) {
std::string ret;
ret.reserve(str.length());
char ch;
std::size_t len = str.length();
for (std::size_t i = 0; i < len; i++) {
if (str[i] != '%') {
if (str[i] == '+')
ret += ' ';
else
ret += str[i];
} else if (i + 2 < len) {
auto toHex = [](char c) {
if (c >= '0' && c <= '9') return c - '0';
if (c >= 'a' && c <= 'f') return c - 'a' + 10;
if (c >= 'A' && c <= 'F') return c - 'A' + 10;
throw std::runtime_error("Invalid hexadecimal number");
};
ch = static_cast<char>('\x10' * toHex(str.at(i + 1)) + toHex(str.at(i + 2)));
ret += ch;
i = i + 2;
}
}
return ret;
};
using std::operator""sv;
std::map<std::string, std::map<std::string, std::map<std::string, std::variant<std::string, int, long>>>> devices;
if (reply.starts_with("ERROR")) {
throw std::runtime_error("Nameserver returned an error");
}
// each line: one device
// auto lines = reply | std::views::lazy_split("\n"sv);
std::ranges::split_view lines{ reply, "\n"sv };
// auto tokens = lines | std::views::transform([](auto &l) {return std::views::split(" "sv);});
auto split_lines = std::views::transform([](auto str) { return std::ranges::split_view{ str, " "sv }; });
for (auto l : lines | split_lines) {
if (l.empty()) {
continue;
}
std::string devicename{ std::string_view{ l.front().data(), l.front().size() } };
auto l2 = std::views::drop(l, 1);
if (l2.empty()) {
devices.insert({ devicename, {} });
continue;
}
std::string classname{ std::string_view{ l2.front().data(), l2.front().size() } };
if (classname.starts_with("*NOT_BOUND*") || classname.starts_with("*UNKNOWN*")) {
devices.insert({ devicename, {} });
continue;
}
auto l3 = std::views::drop(l2, 1);
if (l3.empty()) {
devices.insert({ devicename, {} });
continue;
}
std::map<std::string, std::map<std::string, std::variant<std::string, int, long>>> attributes{};
for (auto attributeString : l3) {
auto tokens = std::views::split(attributeString, "#"sv);
if (tokens.empty()) {
continue;
}
std::string addresfieldcount = { tokens.front().data(), tokens.front().size() };
auto seperatorPos = addresfieldcount.find("://");
std::string proto = addresfieldcount.substr(0, seperatorPos + 3);
std::size_t i;
char *end = to_address(addresfieldcount.end());
std::size_t fieldCount = std::strtoull(addresfieldcount.data() + seperatorPos + 3, &end, 10);
auto [map, _] = attributes.insert({ proto, {} });
map->second.insert({ "Classname", classname });

auto range = std::views::drop(tokens, 1);
auto iterator = range.begin();
std::size_t n = 0;
while (n < fieldCount) {
std::string_view fieldNameView{ &(*iterator).front(), (*iterator).size() };
std::string fieldname{ fieldNameView.substr(0, fieldNameView.size() - 1) };
iterator++;
std::string type{ std::string_view{ &(*iterator).front(), (*iterator).size() } };
if (type == "string") {
iterator++;
std::string sizeString{ std::string_view{ &(*iterator).front(), (*iterator).size() } };
auto parsed = std::to_address(sizeString.end());
std::size_t size = std::strtoull(sizeString.data(), &parsed, 10);
iterator++;
std::string string{ std::string_view{ &(*iterator).front(), (*iterator).size() } };
map->second.insert({ fieldname, urlDecode(string) });
} else if (type == "int") {
iterator++;
std::string sizeString{ std::string_view{ &(*iterator).front(), (*iterator).size() } };
int number = std::atoi(sizeString.data());
map->second.insert({ fieldname, number });
} else if (type == "long") {
iterator++;
std::string sizeString{ std::string_view{ &(*iterator).front(), (*iterator).size() } };
auto parsed = std::to_address(sizeString.end());
long number = std::strtol(sizeString.data(), &parsed, 10);
map->second.insert({ fieldname, number });
} else {
FAIL(fmt::format("unknown type: {}, field: {}, tokens: {}", type, fieldname, tokens));
}
iterator++;
n++;
}
}
devices.insert({ devicename, attributes });
}
return devices;
};

std::string resolveDirectoryLight(std::vector<std::string> devices, std::string_view nameserver, opencmw::zmq::Context &ctx, std::chrono::milliseconds timeout = 500ms) {
const opencmw::zmq::Socket socket{ ctx, ZMQ_STREAM };
if (!opencmw::zmq::invoke(zmq_connect, socket, nameserver).isValid()) {
throw std::runtime_error("could not connect to nameserver.");
}
std::string id;
std::size_t data_len = 255;
id.resize(data_len);
if (!opencmw::zmq::invoke(zmq_getsockopt, socket, ZMQ_IDENTITY, id.data(), &data_len).isValid()) {
throw std::runtime_error("could not get socket identity");
}
id.resize(data_len);

const std::string query = fmt::format("get-device-info\n@client-info opencmw-cpp-directory-light-client\n@version 0.0.1\n{}\n\n", fmt::join(devices, "\n"));

opencmw::zmq::MessageFrame identityFrame{ std::string{ id } };
if (!identityFrame.send(socket, ZMQ_SNDMORE).isValid()) {
throw std::runtime_error("error sending socket id");
}
opencmw::zmq::MessageFrame queryFrame{ std::string{ query } };
if (!queryFrame.send(socket, 0).isValid()) {
throw std::runtime_error("error sending query frame");
}

auto start_time = std::chrono::system_clock::now();
std::string result;
bool data_received = false;
while ((result.empty() || (data_received && !result.empty())) && std::chrono::system_clock::now() - start_time < timeout) { // wait for a maximum of 5 seconds
data_received = false;
opencmw::zmq::MessageFrame idFrame;
const auto byteCountResultId = idFrame.receive(socket, ZMQ_DONTWAIT);
if (byteCountResultId.value() < 1) {
continue;
}
if (idFrame.data() != id) {
throw std::runtime_error("connection identifier from socket does not match connection");
}
opencmw::zmq::MessageFrame frame;
for (auto byteCountResult = frame.receive(socket, ZMQ_DONTWAIT); byteCountResult.value() < 0; byteCountResult = frame.receive(socket, ZMQ_DONTWAIT)) {
}
if (frame.size() > 0) {
result += frame.data();
data_received = true;
}
}
if (!opencmw::zmq::invoke(zmq_disconnect, socket, nameserver).isValid()) {
throw std::runtime_error("could not disconnect");
}
return result;
}
#endif // OPENCMW_CPP_DIRECTORYLIGHTCLIENT_HPP
10 changes: 10 additions & 0 deletions src/client/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ if(NOT EMSCRIPTEN)
# TEST_PREFIX to whatever you want, or use different for different binaries catch_discover_tests(client_tests
# TEST_PREFIX "unittests." REPORTER xml OUTPUT_DIR . OUTPUT_PREFIX "unittests." OUTPUT_SUFFIX .xml)
catch_discover_tests(clientPublisher_tests)

add_executable(CmwLightTest catch_main.cpp CmwLightTest.cpp)
target_link_libraries(
CmwLightTest
PUBLIC opencmw_project_warnings
opencmw_project_options
Catch2::Catch2
client)
target_include_directories(CmwLightTest PRIVATE ${CMAKE_SOURCE_DIR})
catch_discover_tests(CmwLightTest)
endif()

add_executable(rest_client_only_tests RestClientOnly_tests.cpp)
Expand Down
85 changes: 85 additions & 0 deletions src/client/test/CmwLightTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include <catch2/catch.hpp>
#include <Client.hpp>
#include <CmwLightClient.hpp>
#include <DirectoryLightClient.hpp>
#include <fmt/core.h>

TEST_CASE("RDA3", "[Client]") {
std::string nameserverExample = R"""(GSCD025 DigitizerDU2.dal025 rda3://9#Address:#string#18#tcp:%2F%2Fdal025:16134#ApplicationId:#string#114#app=DigitizerDU2;uid=root;host=dal025;pid=16912;os=Linux%2D3%2E10%2E101%2Drt111%2Dscu03;osArch=64bit;appArch=64bit;lang=C%2B%2B;#Language:#string#3#C%2B%2B#Name:#string#19#DigitizerDU2%2Edal025#Pid:#int#16912#ProcessName:#string#12#DigitizerDU2#StartTime:#long#1699343695922#UserName:#string#4#root#Version:#string#5#3%2E1%2E0
GSCD023 DigitizerDU2.fel0053 rda3://9#Address:#string#18#tcp:%2F%2Ffel0053:3717#ApplicationId:#string#115#app=DigitizerDU2;uid=root;host=fel0053;pid=31447;os=Linux%2D3%2E10%2E101%2Drt111%2Dscu03;osArch=64bit;appArch=64bit;lang=C%2B%2B;#Language:#string#3#C%2B%2B#Name:#string#20#DigitizerDU2%2Efel0053#Pid:#int#31447#ProcessName:#string#12#DigitizerDU2#StartTime:#long#1701529074225#UserName:#string#4#root#Version:#string#5#3%2E1%2E0
FantasyDevice3000 *UNKNOWN* *UNKNOWN*)""";
std::string nameserver = "tcp://cmwpro00a.acc.gsi.de:5021";

SECTION("ParseNameserverReply") {
std::map<std::string, std::map<std::string, std::map<std::string, std::variant<std::string, int, long>>>> devices = parse(nameserverExample);
REQUIRE(!devices["GSCD023"].empty());
REQUIRE(!devices["GSCD025"].empty());
REQUIRE(devices["FantasyDevice3000"].empty());
REQUIRE(std::get<std::string>(devices["GSCD025"]["rda3://"]["Address"]) == "tcp://dal025:16134");
}

SECTION("Query rda3 directory server/nameserver") {
opencmw::zmq::Context ctx{};
auto result = resolveDirectoryLight({ "GSCD025", "GSCD023", "FantasyDevice3000" }, nameserver, ctx, 100ms);
REQUIRE(!result.empty());
REQUIRE(result == nameserverExample);
};
}

// small utility function that prints the content of a string in the classic hexedit way with address, hexadecimal and ascii representations
static std::string hexview(const std::string_view value, std::size_t bytesPerLine = 4) {
std::string result;
result.reserve(value.size() * 4);
std::string alpha; // temporarily store the ascii representation
alpha.reserve(8 * bytesPerLine);
std::size_t i = 0;
for (auto c : value) {
if (static_cast<std::size_t>(i) % (bytesPerLine * 8) == 0) {
result.append(fmt::format("{0:#08x} - {0:04} | ", i)); // print address in hex and decimal
}
result.append(fmt::format("{:02x} ", c));
alpha.append(fmt::format("{}", std::isprint(c) ? c : '.'));
if (static_cast<std::size_t>(i + 1) % 8 == 0) {
result.append(" ");
alpha.append(" ");
}
if (static_cast<std::size_t>(i + 1) % (bytesPerLine * 8) == 0) {
result.append(fmt::format(" {}\n", alpha));
alpha.clear();
}
i++;
}
result.append(fmt::format("{:{}} {}\n", "", 3 * (9 * bytesPerLine - alpha.size()), alpha));
return result;
};

TEST_CASE("BasicCmwLight example", "[Client]") {
const std::string digitizerAddress{ "tcp://dal007:2620" };
// filters2String = "acquisitionModeFilter=int:0&channelNameFilter=GS11MU2:Voltage_1@10Hz";
// subscribe("r1", new URI("rda3", null, '/' + DEVICE + '/' + PROPERTY, "ctx=" + SELECTOR + "&" + filtersString, null), null);
// DEVICE = "GSCD002";
// PROPERTY = "AcquisitionDAQ";
// SELECTOR = "FAIR.SELECTOR.ALL";
using namespace opencmw;
const zmq::Context zctx{};
std::vector<std::unique_ptr<opencmw::client::ClientBase>> clients;
clients.emplace_back(std::make_unique<opencmw::client::cmwlight::CmwLightClientCtx>(zctx, 20ms, "testMajordomoClient"));
opencmw::client::ClientContext clientContext{ std::move(clients) };
// send some requests
auto endpoint = URI<STRICT>::factory(URI<STRICT>(digitizerAddress)).scheme("rda3tcp").path("/GSCD002/Version").build();

std::atomic<int> received{ 0 };
clientContext.get(endpoint, [&received](const mdp::Message &message) {
fmt::print("{}", hexview(message.data.asString()));
received++;
});

auto subscriptionEndpoint = URI<STRICT>::factory(URI<STRICT>(digitizerAddress)).scheme("rda3tcp").path("/GSCD002/AcquisitionDAQ").addQueryParameter("ctx", "FAIR.SELECTOR.ALL").build();
clientContext.subscribe(endpoint, [&received](const mdp::Message &message) {
fmt::print("{}", hexview(message.data.asString()));
received++;
});

std::this_thread::sleep_for(8000ms); // allow the request to reach the server
REQUIRE(received == 1);
}
9 changes: 5 additions & 4 deletions src/core/include/SpinWait.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef SPIN_WAIT_HPP
#define SPIN_WAIT_HPP

#include <atomic>
#include <cstdint>
#include <functional>
#include <thread>
Expand Down Expand Up @@ -67,12 +68,12 @@ class SpinWait {
void reset() noexcept { _count = 0; }

template<typename T>
requires std::is_nothrow_invocable_r_v<bool, T>
requires std::is_nothrow_invocable_r_v<bool, T>
bool
spinUntil(const T &condition) const { return spinUntil(condition, -1); }

template<typename T>
requires std::is_nothrow_invocable_r_v<bool, T>
requires std::is_nothrow_invocable_r_v<bool, T>
bool
spinUntil(const T &condition, std::int64_t millisecondsTimeout) const {
if (millisecondsTimeout < -1) {
Expand Down Expand Up @@ -111,8 +112,8 @@ class AtomicMutex {
SPIN_WAIT _spin_wait;

public:
AtomicMutex() = default;
AtomicMutex(const AtomicMutex &) = delete;
AtomicMutex() = default;
AtomicMutex(const AtomicMutex &) = delete;
AtomicMutex &operator=(const AtomicMutex &) = delete;

//
Expand Down
Loading
Loading