Skip to content

Commit

Permalink
Unify subscription topic handling and fix it for REST (#328, #331)
Browse files Browse the repository at this point in the history
Do not lose params when subscribing to a topic via REST.

Bump cpp-httplib to newest version, as it fixes an encoding error
with query parameters encoded in query parameters (SubscriptionContext)
when used with redirects.

To unify subscription handling (different client/servers were making
slightly different assumptions), the following is now implemented:

 - Service names must always start with "/".
 - The topic is of the form "/some/service?param1=..&param2", where the
   service name is mandatory in the topic, and the params are optional.
 - When serialized to a ZMQ topic, i.e. the string-based PUB/SUB topic
   matching, the parameters are sorted, to make the subscription and
   notification topics match exactly (required by ZMQ, as ZMQ subscription
   topics are string-, not URI-based)
 - Valid REST URIs are e.g. "http://localhost:8080/service?a&b"
   or http://localhost:8080/service?a&b", i.e. the service is a
   mandatory part of the URI. mds:// and mdp:// URIs are analogous.
 - On the ZMQ message level, the "serviceName" in above examples would
   be "/service" and the "topic" would be "/service?a&b" (here param
   order is not enforced).
  • Loading branch information
frankosterfeld committed Dec 15, 2023
1 parent 99ae7d2 commit 19a2fcb
Show file tree
Hide file tree
Showing 34 changed files with 1,289 additions and 928 deletions.
98 changes: 81 additions & 17 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmake/DependenciesNative.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ option(WITH_PERF_TOOL "Build with perf-tools" OFF)
FetchContent_Declare(
cpp-httplib
GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git
GIT_TAG v0.11.2 # latest v0.11.2
GIT_TAG v0.14.2 # latest v0.14.2
)

# zlib: optional httplib dependency
Expand Down
2 changes: 1 addition & 1 deletion concepts/client/RestSubscription_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ int main() {
data.put(0);
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Subscribe;
command.endpoint = opencmw::URI<opencmw::STRICT>("http://localhost:8080/event");
command.topic = opencmw::URI<opencmw::STRICT>("http://localhost:8080/event");
command.data = std::move(data);
command.callback = [&received](const opencmw::mdp::Message &rep) {
fmt::print("SSE client received reply = '{}' - body size: '{}'\n", rep.data.asString(), rep.data.size());
Expand Down
4 changes: 2 additions & 2 deletions concepts/client/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct rest_test_step {
requires(command != opencmw::mdp::Command::Set)
: _client(client), _resultChecker(std::move(resultChecker)), _expectedRepliesCount(expectedRepliesCount) {
_command.command = command;
_command.endpoint = endpoint;
_command.topic = endpoint;
_command.callback = [this](const opencmw::mdp::Message &reply) {
fmt::print("Reply R\"({})\"\n", reply.data.asString());
if (_resultChecker && !_resultChecker(reply)) {
Expand All @@ -66,7 +66,7 @@ struct rest_test_step {
: _client(client) {
_command.command = command;
_command.data = opencmw::IoBuffer(new_data.data(), new_data.size());
_command.endpoint = endpoint;
_command.topic = endpoint;
_command.callback = [this](const opencmw::mdp::Message & /*reply*/) {
next_step();
};
Expand Down
10 changes: 5 additions & 5 deletions concepts/majordomo/FilterSubscription_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class AcquisitionWorker : public Worker<serviceName, FilterContext, Empty, Reply
Reply subscriptionReply;
try {
handleGetRequest(filterIn, filterOut, subscriptionReply);
super_t::notify(std::string(serviceName.c_str()), filterOut, subscriptionReply);
super_t::notify(filterOut, subscriptionReply);
} catch (const std::exception &ex) {
fmt::print("caught specific exception '{}'\n", ex.what());
} catch (...) {
Expand All @@ -133,7 +133,7 @@ int main() {
using opencmw::URI;

// init Broker and one simple Worker
Broker broker("PrimaryBroker");
Broker broker("/PrimaryBroker");
if (!broker.bind(URI<>("mds://127.0.0.1:12345"))) {
std::cerr << "Could not bind to broker address" << std::endl;
return 1;
Expand All @@ -154,11 +154,11 @@ int main() {
std::atomic<int> receivedA{ 0 };
std::atomic<int> receivedAB{ 0 };
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A"), [&receivedA](const opencmw::mdp::Message &update) {
fmt::print("Client('A') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str());
fmt::print("Client('A') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.topic.str());
receivedA++;
});
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A,B"), [&receivedAB](const opencmw::mdp::Message &update) {
fmt::print("Client('A,B') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str());
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A%2CB"), [&receivedAB](const opencmw::mdp::Message &update) {
fmt::print("Client('A,B') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.topic.str());
receivedAB++;
});

Expand Down
30 changes: 15 additions & 15 deletions concepts/majordomo/MajordomoRest_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ int main(int argc, char **argv) {

// note: inconsistency: brokerName as ctor argument, worker's serviceName as NTTP
// note: default roles different from java (has: ADMIN, READ_WRITE, READ_ONLY, ANYONE, NULL)
majordomo::Broker primaryBroker("PrimaryBroker", testSettings());
majordomo::Broker primaryBroker("/PrimaryBroker", testSettings());
opencmw::query::registerTypes(SimpleContext(), primaryBroker);

auto fs = cmrc::assets::get_filesystem();
Expand Down Expand Up @@ -72,27 +72,27 @@ int main(int argc, char **argv) {
});

// second broker to test DNS functionalities
majordomo::Broker secondaryBroker("SecondaryTestBroker", { .dnsAddress = brokerRouterAddress->str() });
majordomo::Broker secondaryBroker("/SecondaryTestBroker", { .dnsAddress = brokerRouterAddress->str() });
std::jthread secondaryBrokerThread([&secondaryBroker] {
secondaryBroker.run();
});
});

//
majordomo::Worker<"helloWorld", SimpleContext, SimpleRequest, SimpleReply, majordomo::description<"A friendly service saying hello">> helloWorldWorker(primaryBroker, HelloWorldHandler());
majordomo::Worker<"addressbook", SimpleContext, AddressRequest, AddressEntry> addressbookWorker(primaryBroker, TestAddressHandler());
majordomo::Worker<"addressbookBackup", SimpleContext, AddressRequest, AddressEntry> addressbookBackupWorker(primaryBroker, TestAddressHandler());
majordomo::BasicWorker<"beverages"> beveragesWorker(primaryBroker, TestIntHandler(10));
majordomo::Worker<"/helloWorld", SimpleContext, SimpleRequest, SimpleReply, majordomo::description<"A friendly service saying hello">> helloWorldWorker(primaryBroker, HelloWorldHandler());
majordomo::Worker<"/addressbook", SimpleContext, AddressRequest, AddressEntry> addressbookWorker(primaryBroker, TestAddressHandler());
majordomo::Worker<"/addressbookBackup", SimpleContext, AddressRequest, AddressEntry> addressbookBackupWorker(primaryBroker, TestAddressHandler());
majordomo::BasicWorker<"/beverages"> beveragesWorker(primaryBroker, TestIntHandler(10));

//
ImageServiceWorker<"testImage", majordomo::description<"Returns an image">> imageWorker(primaryBroker, std::chrono::seconds(10));
ImageServiceWorker<"/testImage", majordomo::description<"Returns an image">> imageWorker(primaryBroker, std::chrono::seconds(10));

//
RunInThread runHelloWorld(helloWorldWorker);
RunInThread runAddressbook(addressbookWorker);
RunInThread runAddressbookBackup(addressbookBackupWorker);
RunInThread runBeverages(beveragesWorker);
RunInThread runImage(imageWorker);
waitUntilServiceAvailable(primaryBroker.context, "addressbook");
waitUntilWorkerServiceAvailable(primaryBroker.context, addressbookWorker);

// Fake message publisher - sends messages on notifier.service
TestNode<mdp::MessageFormat::WithoutSourceId> publisher(primaryBroker.context);
Expand All @@ -102,9 +102,9 @@ int main(int argc, char **argv) {
{
std::cerr << "Sending new number (step " << i << ")\n";
mdp::Message notifyMessage;
notifyMessage.endpoint = mdp::Message::URI("/wine");
const auto data = std::to_string(i);
notifyMessage.data = opencmw::IoBuffer(data.data(), data.size());
notifyMessage.topic = mdp::Message::URI("/wine");
const auto data = std::to_string(i);
notifyMessage.data = opencmw::IoBuffer(data.data(), data.size());

beveragesWorker.notify(std::move(notifyMessage));
}
Expand All @@ -125,15 +125,15 @@ int main(int argc, char **argv) {
.contentType = opencmw::MIME::JSON
};

addressbookWorker.notify("/addressbook", context, entry);
addressbookWorker.notify(context, entry);

context.testFilter = "main";
entry.city = "London";
addressbookWorker.notify("/addressbook", context, entry);
addressbookWorker.notify(context, entry);

context.testFilter = "alternate";
entry.city = "Brighton";
addressbookWorker.notify("/addressbook", context, entry);
addressbookWorker.notify(context, entry);
}

std::this_thread::sleep_for(3s);
Expand Down
29 changes: 16 additions & 13 deletions concepts/majordomo/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,16 @@ class ImageServiceWorker : public majordomo::Worker<serviceName, SimpleContext,
reply.image.base64 = base64pp::encode(imageData[selectedImage]);
reply.image.contentType = "image/png"; // MIME::PNG;
// TODO the subscription via REST has a leading slash, so this "/" is necessary for it to match, check if that can be avoided
super_t::notify("/", context, reply);
super_t::notify(context, reply);
}
});

super_t::setCallback([this](majordomo::RequestContext &rawCtx, const SimpleContext &, const majordomo::Empty &, SimpleContext &, BinaryData &out) {
using namespace opencmw;
const auto topicPath = rawCtx.request.endpoint.path().value_or("");
const auto path = ::detail::stripPrefix(topicPath, "/");
const auto params = rawCtx.request.topic.queryParamMap();
const auto pathParam = params.find("path");
auto path = pathParam != params.end() ? pathParam->second.value_or("") : "";
path = ::detail::stripPrefix(path, "/");
out.resourceName = ::detail::stripPrefix(::detail::stripPrefix(path, PROPERTY_NAME), "/");
out.image.base64 = base64pp::encode(imageData[selectedImage]);
out.image.contentType = "image/png"; // MIME::PNG;
Expand Down Expand Up @@ -281,26 +283,22 @@ class TestNode {
return zmq::invoke(zmq_bind, _socket, mdp::toZeroMQEndpoint(address).data()).isValid();
}

bool connect(const opencmw::URI<opencmw::STRICT> &address, const mdp::SubscriptionTopic &subscription = {}) {
bool connect(const opencmw::URI<opencmw::STRICT> &address, const std::string_view subscriptionTopic = {}) {
auto result = zmq::invoke(zmq_connect, _socket, mdp::toZeroMQEndpoint(address).data());
if (!result) return false;

if (!subscription.empty()) {
return subscribe(subscription);
if (!subscriptionTopic.empty()) {
return subscribe(subscriptionTopic);
}

return true;
}

bool subscribe(const mdp::SubscriptionTopic &subscription) {
const auto topic = subscription.toZmqTopic();
assert(!topic.empty());
bool subscribe(std::string_view topic) {
return zmq::invoke(zmq_setsockopt, _socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).isValid();
}

bool unsubscribe(const mdp::SubscriptionTopic &subscription) {
const auto topic = subscription.toZmqTopic();
assert(!topic.empty());
bool unsubscribe(const std::string_view topic) {
return zmq::invoke(zmq_setsockopt, _socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).isValid();
}

Expand Down Expand Up @@ -355,7 +353,7 @@ inline bool waitUntilServiceAvailable(const zmq::Context &context, std::string_v
mdp::Message request;
request.protocolName = mdp::clientProtocol;
request.command = mdp::Command::Get;
request.serviceName = "mmi.service";
request.serviceName = "/mmi.service";
request.data = opencmw::IoBuffer(serviceName.data(), serviceName.size());
client.send(std::move(request));

Expand All @@ -372,6 +370,11 @@ inline bool waitUntilServiceAvailable(const zmq::Context &context, std::string_v
return false;
}

template<typename Worker>
inline bool waitUntilWorkerServiceAvailable(const zmq::Context &context, const Worker &, const opencmw::URI<opencmw::STRICT> &brokerAddress = opencmw::majordomo::INTERNAL_ADDRESS_BROKER) {
return waitUntilServiceAvailable(context, Worker::name, brokerAddress);
}

class TestIntHandler {
int _x = 10;

Expand Down
42 changes: 22 additions & 20 deletions src/client/include/Client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <ClientContext.hpp>
#include <MdpMessage.hpp>
#include <opencmw.hpp>
#include <SubscriptionTopic.hpp>
#include <Topic.hpp>
#include <URI.hpp>
#include <zmq/ZmqUtils.hpp>

Expand Down Expand Up @@ -96,26 +96,26 @@ class Client : public MDClientBase {

void get(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Get, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Get, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

void set(const URI<STRICT> &uri, std::string_view req_id, const std::span<const std::byte> &request) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Set, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Set, uri, req_id);
message.data = IoBuffer(reinterpret_cast<const char *>(request.data()), request.size());
zmq::send(std::move(message), con._socket).assertSuccess();
}

void subscribe(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Subscribe, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Subscribe, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

void unsubscribe(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

Expand All @@ -134,7 +134,7 @@ class Client : public MDClientBase {

static bool handleMessage(mdp::Message &message) {
if (message.command == mdp::Command::Notify || message.command == mdp::Command::Final) {
message.arrivalTime = std::chrono::system_clock::now();
message.arrivalTime = std::chrono::system_clock::now();
const auto requestId_sv = message.clientRequestID.asString();
if (auto result = std::from_chars(requestId_sv.data(), requestId_sv.data() + requestId_sv.size(), message.id); result.ec == std::errc::invalid_argument || result.ec == std::errc::result_out_of_range) {
message.id = 0;
Expand Down Expand Up @@ -184,11 +184,13 @@ class Client : public MDClientBase {
}

private:
static mdp::Message createRequestTemplate(mdp::Command command, std::string_view serviceName, std::string_view req_id) {
static mdp::Message createRequestTemplate(mdp::Command command, const URI<STRICT> &uri, std::string_view req_id) {
const auto subscription = mdp::Topic::fromString(uri.relativeRefNoFragment().value_or(""));
mdp::Message req;
req.protocolName = mdp::clientProtocol;
req.command = command;
req.serviceName = std::string(serviceName);
req.serviceName = subscription.service();
req.topic = subscription.toMdpTopic();
req.clientRequestID = IoBuffer(req_id.data(), req_id.size());

return req;
Expand Down Expand Up @@ -246,17 +248,17 @@ class SubscriptionClient : public MDClientBase {
}

void subscribe(const URI<STRICT> &uri, std::string_view /*reqId*/) override {
auto &con = findConnection(uri);
const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic();
assert(!serviceName.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess();
auto &con = findConnection(uri);
const auto topic = mdp::Topic::fromString(uri.relativeRefNoFragment().value_or("")).toZmqTopic();
assert(!topic.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).assertSuccess();
}

void unsubscribe(const URI<STRICT> &uri, std::string_view /*reqId*/) override {
auto &con = findConnection(uri);
const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic();
assert(!serviceName.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess();
auto &con = findConnection(uri);
const auto topic = mdp::Topic::fromString(uri.relativeRefNoFragment().value_or("")).toZmqTopic();
assert(!topic.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).assertSuccess();
}

bool disconnect(detail::Connection &con) {
Expand All @@ -280,7 +282,7 @@ class SubscriptionClient : public MDClientBase {
output.error = std::move(message.error);
// output.serviceName = URI<uri_check::STRICT>(std::string{ message.serviceName() });
output.serviceName = std::move(message.sourceId); // temporary hack until serviceName -> 'requestedTopic' and 'topic' -> 'replyTopic'
output.endpoint = std::move(message.endpoint);
output.topic = std::move(message.topic);
output.clientRequestID = std::move(message.clientRequestID);
output.rbac = std::move(message.rbac);
output.id = 0; // review if this is still needed
Expand Down Expand Up @@ -389,15 +391,15 @@ class MDClientCtx : public ClientBase {
if (cmd.callback) {
if (cmd.command == mdp::Command::Get || cmd.command == mdp::Command::Set) {
req_id = _request_id++;
_requests.insert({ req_id, Request{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
_requests.insert({ req_id, Request{ .uri = cmd.topic, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
} else if (cmd.command == mdp::Command::Subscribe) {
req_id = _request_id++;
_subscriptions.insert({ mdp::SubscriptionTopic::fromURI(cmd.endpoint).toZmqTopic(), Subscription{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
_subscriptions.insert({ mdp::Topic::fromMdpTopic(cmd.topic).toZmqTopic(), Subscription{ .uri = cmd.topic, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
} else if (cmd.command == mdp::Command::Unsubscribe) {
_requests.erase(0); // todo: lookup correct subscription
}
}
sendCmd(cmd.endpoint, cmd.command, req_id, cmd.data);
sendCmd(cmd.topic, cmd.command, req_id, cmd.data);
}

private:
Expand Down
Loading

0 comments on commit 19a2fcb

Please sign in to comment.