Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix subscriptions via REST when using params #331

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 81 additions & 17 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmake/DependenciesNative.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ option(WITH_PERF_TOOL "Build with perf-tools" OFF)
FetchContent_Declare(
cpp-httplib
GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git
GIT_TAG v0.11.2 # latest v0.11.2
GIT_TAG v0.14.2 # latest v0.14.2
)

# zlib: optional httplib dependency
Expand Down
2 changes: 1 addition & 1 deletion concepts/client/RestSubscription_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ int main() {
data.put(0);
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Subscribe;
command.endpoint = opencmw::URI<opencmw::STRICT>("http://localhost:8080/event");
command.topic = opencmw::URI<opencmw::STRICT>("http://localhost:8080/event");
command.data = std::move(data);
command.callback = [&received](const opencmw::mdp::Message &rep) {
fmt::print("SSE client received reply = '{}' - body size: '{}'\n", rep.data.asString(), rep.data.size());
Expand Down
4 changes: 2 additions & 2 deletions concepts/client/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct rest_test_step {
requires(command != opencmw::mdp::Command::Set)
: _client(client), _resultChecker(std::move(resultChecker)), _expectedRepliesCount(expectedRepliesCount) {
_command.command = command;
_command.endpoint = endpoint;
_command.topic = endpoint;
_command.callback = [this](const opencmw::mdp::Message &reply) {
fmt::print("Reply R\"({})\"\n", reply.data.asString());
if (_resultChecker && !_resultChecker(reply)) {
Expand All @@ -66,7 +66,7 @@ struct rest_test_step {
: _client(client) {
_command.command = command;
_command.data = opencmw::IoBuffer(new_data.data(), new_data.size());
_command.endpoint = endpoint;
_command.topic = endpoint;
_command.callback = [this](const opencmw::mdp::Message & /*reply*/) {
next_step();
};
Expand Down
10 changes: 5 additions & 5 deletions concepts/majordomo/FilterSubscription_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class AcquisitionWorker : public Worker<serviceName, FilterContext, Empty, Reply
Reply subscriptionReply;
try {
handleGetRequest(filterIn, filterOut, subscriptionReply);
super_t::notify(std::string(serviceName.c_str()), filterOut, subscriptionReply);
super_t::notify(filterOut, subscriptionReply);
} catch (const std::exception &ex) {
fmt::print("caught specific exception '{}'\n", ex.what());
} catch (...) {
Expand All @@ -133,7 +133,7 @@ int main() {
using opencmw::URI;

// init Broker and one simple Worker
Broker broker("PrimaryBroker");
Broker broker("/PrimaryBroker");
if (!broker.bind(URI<>("mds://127.0.0.1:12345"))) {
std::cerr << "Could not bind to broker address" << std::endl;
return 1;
Expand All @@ -154,11 +154,11 @@ int main() {
std::atomic<int> receivedA{ 0 };
std::atomic<int> receivedAB{ 0 };
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A"), [&receivedA](const opencmw::mdp::Message &update) {
fmt::print("Client('A') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str());
fmt::print("Client('A') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.topic.str());
receivedA++;
});
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A,B"), [&receivedAB](const opencmw::mdp::Message &update) {
fmt::print("Client('A,B') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str());
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A%2CB"), [&receivedAB](const opencmw::mdp::Message &update) {
fmt::print("Client('A,B') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.topic.str());
receivedAB++;
});

Expand Down
30 changes: 15 additions & 15 deletions concepts/majordomo/MajordomoRest_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ int main(int argc, char **argv) {

// note: inconsistency: brokerName as ctor argument, worker's serviceName as NTTP
// note: default roles different from java (has: ADMIN, READ_WRITE, READ_ONLY, ANYONE, NULL)
majordomo::Broker primaryBroker("PrimaryBroker", testSettings());
majordomo::Broker primaryBroker("/PrimaryBroker", testSettings());
opencmw::query::registerTypes(SimpleContext(), primaryBroker);

auto fs = cmrc::assets::get_filesystem();
Expand Down Expand Up @@ -72,27 +72,27 @@ int main(int argc, char **argv) {
});

// second broker to test DNS functionalities
majordomo::Broker secondaryBroker("SecondaryTestBroker", { .dnsAddress = brokerRouterAddress->str() });
majordomo::Broker secondaryBroker("/SecondaryTestBroker", { .dnsAddress = brokerRouterAddress->str() });
std::jthread secondaryBrokerThread([&secondaryBroker] {
secondaryBroker.run();
});
});

//
majordomo::Worker<"helloWorld", SimpleContext, SimpleRequest, SimpleReply, majordomo::description<"A friendly service saying hello">> helloWorldWorker(primaryBroker, HelloWorldHandler());
majordomo::Worker<"addressbook", SimpleContext, AddressRequest, AddressEntry> addressbookWorker(primaryBroker, TestAddressHandler());
majordomo::Worker<"addressbookBackup", SimpleContext, AddressRequest, AddressEntry> addressbookBackupWorker(primaryBroker, TestAddressHandler());
majordomo::BasicWorker<"beverages"> beveragesWorker(primaryBroker, TestIntHandler(10));
majordomo::Worker<"/helloWorld", SimpleContext, SimpleRequest, SimpleReply, majordomo::description<"A friendly service saying hello">> helloWorldWorker(primaryBroker, HelloWorldHandler());
majordomo::Worker<"/addressbook", SimpleContext, AddressRequest, AddressEntry> addressbookWorker(primaryBroker, TestAddressHandler());
majordomo::Worker<"/addressbookBackup", SimpleContext, AddressRequest, AddressEntry> addressbookBackupWorker(primaryBroker, TestAddressHandler());
majordomo::BasicWorker<"/beverages"> beveragesWorker(primaryBroker, TestIntHandler(10));

//
ImageServiceWorker<"testImage", majordomo::description<"Returns an image">> imageWorker(primaryBroker, std::chrono::seconds(10));
ImageServiceWorker<"/testImage", majordomo::description<"Returns an image">> imageWorker(primaryBroker, std::chrono::seconds(10));

//
RunInThread runHelloWorld(helloWorldWorker);
RunInThread runAddressbook(addressbookWorker);
RunInThread runAddressbookBackup(addressbookBackupWorker);
RunInThread runBeverages(beveragesWorker);
RunInThread runImage(imageWorker);
waitUntilServiceAvailable(primaryBroker.context, "addressbook");
waitUntilWorkerServiceAvailable(primaryBroker.context, addressbookWorker);

// Fake message publisher - sends messages on notifier.service
TestNode<mdp::MessageFormat::WithoutSourceId> publisher(primaryBroker.context);
Expand All @@ -102,9 +102,9 @@ int main(int argc, char **argv) {
{
std::cerr << "Sending new number (step " << i << ")\n";
mdp::Message notifyMessage;
notifyMessage.endpoint = mdp::Message::URI("/wine");
const auto data = std::to_string(i);
notifyMessage.data = opencmw::IoBuffer(data.data(), data.size());
notifyMessage.topic = mdp::Message::URI("/wine");
const auto data = std::to_string(i);
notifyMessage.data = opencmw::IoBuffer(data.data(), data.size());

beveragesWorker.notify(std::move(notifyMessage));
}
Expand All @@ -125,15 +125,15 @@ int main(int argc, char **argv) {
.contentType = opencmw::MIME::JSON
};

addressbookWorker.notify("/addressbook", context, entry);
addressbookWorker.notify(context, entry);

context.testFilter = "main";
entry.city = "London";
addressbookWorker.notify("/addressbook", context, entry);
addressbookWorker.notify(context, entry);

context.testFilter = "alternate";
entry.city = "Brighton";
addressbookWorker.notify("/addressbook", context, entry);
addressbookWorker.notify(context, entry);
}

std::this_thread::sleep_for(3s);
Expand Down
29 changes: 16 additions & 13 deletions concepts/majordomo/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,16 @@ class ImageServiceWorker : public majordomo::Worker<serviceName, SimpleContext,
reply.image.base64 = base64pp::encode(imageData[selectedImage]);
reply.image.contentType = "image/png"; // MIME::PNG;
// TODO the subscription via REST has a leading slash, so this "/" is necessary for it to match, check if that can be avoided
super_t::notify("/", context, reply);
super_t::notify(context, reply);
}
});

super_t::setCallback([this](majordomo::RequestContext &rawCtx, const SimpleContext &, const majordomo::Empty &, SimpleContext &, BinaryData &out) {
using namespace opencmw;
const auto topicPath = rawCtx.request.endpoint.path().value_or("");
const auto path = ::detail::stripPrefix(topicPath, "/");
const auto params = rawCtx.request.topic.queryParamMap();
const auto pathParam = params.find("path");
auto path = pathParam != params.end() ? pathParam->second.value_or("") : "";
path = ::detail::stripPrefix(path, "/");
out.resourceName = ::detail::stripPrefix(::detail::stripPrefix(path, PROPERTY_NAME), "/");
out.image.base64 = base64pp::encode(imageData[selectedImage]);
out.image.contentType = "image/png"; // MIME::PNG;
Expand Down Expand Up @@ -281,26 +283,22 @@ class TestNode {
return zmq::invoke(zmq_bind, _socket, mdp::toZeroMQEndpoint(address).data()).isValid();
}

bool connect(const opencmw::URI<opencmw::STRICT> &address, const mdp::SubscriptionTopic &subscription = {}) {
bool connect(const opencmw::URI<opencmw::STRICT> &address, const std::string_view subscriptionTopic = {}) {
auto result = zmq::invoke(zmq_connect, _socket, mdp::toZeroMQEndpoint(address).data());
if (!result) return false;

if (!subscription.empty()) {
return subscribe(subscription);
if (!subscriptionTopic.empty()) {
return subscribe(subscriptionTopic);
}

return true;
}

bool subscribe(const mdp::SubscriptionTopic &subscription) {
const auto topic = subscription.toZmqTopic();
assert(!topic.empty());
bool subscribe(std::string_view topic) {
return zmq::invoke(zmq_setsockopt, _socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).isValid();
}

bool unsubscribe(const mdp::SubscriptionTopic &subscription) {
const auto topic = subscription.toZmqTopic();
assert(!topic.empty());
bool unsubscribe(const std::string_view topic) {
return zmq::invoke(zmq_setsockopt, _socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).isValid();
}

Expand Down Expand Up @@ -355,7 +353,7 @@ inline bool waitUntilServiceAvailable(const zmq::Context &context, std::string_v
mdp::Message request;
request.protocolName = mdp::clientProtocol;
request.command = mdp::Command::Get;
request.serviceName = "mmi.service";
request.serviceName = "/mmi.service";
request.data = opencmw::IoBuffer(serviceName.data(), serviceName.size());
client.send(std::move(request));

Expand All @@ -372,6 +370,11 @@ inline bool waitUntilServiceAvailable(const zmq::Context &context, std::string_v
return false;
}

template<typename Worker>
inline bool waitUntilWorkerServiceAvailable(const zmq::Context &context, const Worker &, const opencmw::URI<opencmw::STRICT> &brokerAddress = opencmw::majordomo::INTERNAL_ADDRESS_BROKER) {
return waitUntilServiceAvailable(context, Worker::name, brokerAddress);
}

class TestIntHandler {
int _x = 10;

Expand Down
42 changes: 22 additions & 20 deletions src/client/include/Client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <ClientContext.hpp>
#include <MdpMessage.hpp>
#include <opencmw.hpp>
#include <SubscriptionTopic.hpp>
#include <Topic.hpp>
#include <URI.hpp>
#include <zmq/ZmqUtils.hpp>

Expand Down Expand Up @@ -96,26 +96,26 @@ class Client : public MDClientBase {

void get(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Get, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Get, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

void set(const URI<STRICT> &uri, std::string_view req_id, const std::span<const std::byte> &request) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Set, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Set, uri, req_id);
message.data = IoBuffer(reinterpret_cast<const char *>(request.data()), request.size());
zmq::send(std::move(message), con._socket).assertSuccess();
}

void subscribe(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Subscribe, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Subscribe, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

void unsubscribe(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

Expand All @@ -134,7 +134,7 @@ class Client : public MDClientBase {

static bool handleMessage(mdp::Message &message) {
if (message.command == mdp::Command::Notify || message.command == mdp::Command::Final) {
message.arrivalTime = std::chrono::system_clock::now();
message.arrivalTime = std::chrono::system_clock::now();
const auto requestId_sv = message.clientRequestID.asString();
if (auto result = std::from_chars(requestId_sv.data(), requestId_sv.data() + requestId_sv.size(), message.id); result.ec == std::errc::invalid_argument || result.ec == std::errc::result_out_of_range) {
message.id = 0;
Expand Down Expand Up @@ -184,11 +184,13 @@ class Client : public MDClientBase {
}

private:
static mdp::Message createRequestTemplate(mdp::Command command, std::string_view serviceName, std::string_view req_id) {
static mdp::Message createRequestTemplate(mdp::Command command, const URI<STRICT> &uri, std::string_view req_id) {
const auto subscription = mdp::Topic::fromString(uri.relativeRefNoFragment().value_or(""));
mdp::Message req;
req.protocolName = mdp::clientProtocol;
req.command = command;
req.serviceName = std::string(serviceName);
req.serviceName = subscription.service();
req.topic = subscription.toMdpTopic();
req.clientRequestID = IoBuffer(req_id.data(), req_id.size());

return req;
Expand Down Expand Up @@ -246,17 +248,17 @@ class SubscriptionClient : public MDClientBase {
}

void subscribe(const URI<STRICT> &uri, std::string_view /*reqId*/) override {
auto &con = findConnection(uri);
const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic();
assert(!serviceName.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess();
auto &con = findConnection(uri);
const auto topic = mdp::Topic::fromString(uri.relativeRefNoFragment().value_or("")).toZmqTopic();
assert(!topic.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).assertSuccess();
}

void unsubscribe(const URI<STRICT> &uri, std::string_view /*reqId*/) override {
auto &con = findConnection(uri);
const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic();
assert(!serviceName.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess();
auto &con = findConnection(uri);
const auto topic = mdp::Topic::fromString(uri.relativeRefNoFragment().value_or("")).toZmqTopic();
assert(!topic.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).assertSuccess();
}

bool disconnect(detail::Connection &con) {
Expand All @@ -280,7 +282,7 @@ class SubscriptionClient : public MDClientBase {
output.error = std::move(message.error);
// output.serviceName = URI<uri_check::STRICT>(std::string{ message.serviceName() });
output.serviceName = std::move(message.sourceId); // temporary hack until serviceName -> 'requestedTopic' and 'topic' -> 'replyTopic'
output.endpoint = std::move(message.endpoint);
output.topic = std::move(message.topic);
output.clientRequestID = std::move(message.clientRequestID);
output.rbac = std::move(message.rbac);
output.id = 0; // review if this is still needed
Expand Down Expand Up @@ -389,15 +391,15 @@ class MDClientCtx : public ClientBase {
if (cmd.callback) {
if (cmd.command == mdp::Command::Get || cmd.command == mdp::Command::Set) {
req_id = _request_id++;
_requests.insert({ req_id, Request{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
_requests.insert({ req_id, Request{ .uri = cmd.topic, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
} else if (cmd.command == mdp::Command::Subscribe) {
req_id = _request_id++;
_subscriptions.insert({ mdp::SubscriptionTopic::fromURI(cmd.endpoint).toZmqTopic(), Subscription{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
_subscriptions.insert({ mdp::Topic::fromMdpTopic(cmd.topic).toZmqTopic(), Subscription{ .uri = cmd.topic, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
} else if (cmd.command == mdp::Command::Unsubscribe) {
_requests.erase(0); // todo: lookup correct subscription
}
}
sendCmd(cmd.endpoint, cmd.command, req_id, cmd.data);
sendCmd(cmd.topic, cmd.command, req_id, cmd.data);
}

private:
Expand Down
Loading