diff --git a/cmake/DependenciesNative.cmake b/cmake/DependenciesNative.cmake index 0bf1a02f..6c734ba5 100644 --- a/cmake/DependenciesNative.cmake +++ b/cmake/DependenciesNative.cmake @@ -18,7 +18,7 @@ option(WITH_PERF_TOOL "Build with perf-tools" OFF) FetchContent_Declare( cpp-httplib GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git - GIT_TAG v0.11.2 # latest v0.11.2 + GIT_TAG v0.14.1 # latest v0.14.1 ) # zlib: optional httplib dependency diff --git a/concepts/client/RestSubscription_example.cpp b/concepts/client/RestSubscription_example.cpp index 7f5141d4..5912dce6 100644 --- a/concepts/client/RestSubscription_example.cpp +++ b/concepts/client/RestSubscription_example.cpp @@ -19,9 +19,7 @@ class EventDispatcher { std::unique_lock lk(_mutex); int id = std::atomic_load_explicit(&_id, std::memory_order_acquire); _condition.wait(lk, [id, this] { return _cid == id; }); - if (sink.is_writable()) { - sink.write(_message.data(), _message.size()); - } + sink.write(_message.data(), _message.size()); } void send_event(const std::string_view &message) { diff --git a/concepts/majordomo/FilterSubscription_example.cpp b/concepts/majordomo/FilterSubscription_example.cpp index c5bb0a26..6edd956f 100644 --- a/concepts/majordomo/FilterSubscription_example.cpp +++ b/concepts/majordomo/FilterSubscription_example.cpp @@ -138,7 +138,7 @@ int main() { std::cerr << "Could not bind to broker address" << std::endl; return 1; } - AcquisitionWorker<"/DeviceName/Acquisition"> acquisitionWorker(broker); + AcquisitionWorker<"Acquisition"> acquisitionWorker(broker); // start broker and worker as background processes std::jthread brokerThread([&broker] { broker.run(); }); @@ -153,11 +153,11 @@ int main() { std::atomic receivedA{ 0 }; std::atomic receivedAB{ 0 }; - client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A"), [&receivedA](const opencmw::mdp::Message &update) { + client.subscribe(URI("mds://127.0.0.1:12345/Acquisition?signalFilter=A"), [&receivedA](const opencmw::mdp::Message &update) { fmt::print("Client('A') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str()); receivedA++; }); - client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A,B"), [&receivedAB](const opencmw::mdp::Message &update) { + client.subscribe(URI("mds://127.0.0.1:12345/Acquisition?signalFilter=A%2CB"), [&receivedAB](const opencmw::mdp::Message &update) { fmt::print("Client('A,B') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str()); receivedAB++; }); diff --git a/concepts/majordomo/helpers.hpp b/concepts/majordomo/helpers.hpp index 2cff6764..c0e7efc9 100644 --- a/concepts/majordomo/helpers.hpp +++ b/concepts/majordomo/helpers.hpp @@ -281,26 +281,22 @@ class TestNode { return zmq::invoke(zmq_bind, _socket, mdp::toZeroMQEndpoint(address).data()).isValid(); } - bool connect(const opencmw::URI &address, const mdp::SubscriptionTopic &subscription = {}) { + bool connect(const opencmw::URI &address, const std::string_view subscriptionTopic = {}) { auto result = zmq::invoke(zmq_connect, _socket, mdp::toZeroMQEndpoint(address).data()); if (!result) return false; - if (!subscription.empty()) { - return subscribe(subscription); + if (!subscriptionTopic.empty()) { + return subscribe(subscriptionTopic); } return true; } - bool subscribe(const mdp::SubscriptionTopic &subscription) { - const auto topic = subscription.toZmqTopic(); - assert(!topic.empty()); + bool subscribe(std::string_view topic) { return zmq::invoke(zmq_setsockopt, _socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).isValid(); } - bool unsubscribe(const mdp::SubscriptionTopic &subscription) { - const auto topic = subscription.toZmqTopic(); - assert(!topic.empty()); + bool unsubscribe(const std::string_view topic) { return zmq::invoke(zmq_setsockopt, _socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).isValid(); } diff --git a/src/client/include/Client.hpp b/src/client/include/Client.hpp index 0b3981ea..7dcb8b67 100644 --- a/src/client/include/Client.hpp +++ b/src/client/include/Client.hpp @@ -96,26 +96,26 @@ class Client : public MDClientBase { void get(const URI &uri, std::string_view req_id) override { const auto &con = findConnection(uri); - auto message = createRequestTemplate(mdp::Command::Get, uri.relativeRefNoFragment().value(), req_id); + auto message = createRequestTemplate(mdp::Command::Get, uri, req_id); zmq::send(std::move(message), con._socket).assertSuccess(); } void set(const URI &uri, std::string_view req_id, const std::span &request) override { const auto &con = findConnection(uri); - auto message = createRequestTemplate(mdp::Command::Set, uri.relativeRefNoFragment().value(), req_id); + auto message = createRequestTemplate(mdp::Command::Set, uri, req_id); message.data = IoBuffer(reinterpret_cast(request.data()), request.size()); zmq::send(std::move(message), con._socket).assertSuccess(); } void subscribe(const URI &uri, std::string_view req_id) override { const auto &con = findConnection(uri); - auto message = createRequestTemplate(mdp::Command::Subscribe, uri.relativeRefNoFragment().value(), req_id); + auto message = createRequestTemplate(mdp::Command::Subscribe, uri, req_id); zmq::send(std::move(message), con._socket).assertSuccess(); } void unsubscribe(const URI &uri, std::string_view req_id) override { const auto &con = findConnection(uri); - auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri.relativeRefNoFragment().value(), req_id); + auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri, req_id); zmq::send(std::move(message), con._socket).assertSuccess(); } @@ -134,7 +134,7 @@ class Client : public MDClientBase { static bool handleMessage(mdp::Message &message) { if (message.command == mdp::Command::Notify || message.command == mdp::Command::Final) { - message.arrivalTime = std::chrono::system_clock::now(); + message.arrivalTime = std::chrono::system_clock::now(); const auto requestId_sv = message.clientRequestID.asString(); if (auto result = std::from_chars(requestId_sv.data(), requestId_sv.data() + requestId_sv.size(), message.id); result.ec == std::errc::invalid_argument || result.ec == std::errc::result_out_of_range) { message.id = 0; @@ -184,11 +184,30 @@ class Client : public MDClientBase { } private: - static mdp::Message createRequestTemplate(mdp::Command command, std::string_view serviceName, std::string_view req_id) { + static mdp::Message createRequestTemplate(mdp::Command command, const URI &uri, std::string_view req_id) { + const auto ref = uri.relativeRefNoFragment(); + const auto &[serviceName, endpoint] = [&ref] { + if (!ref) { + return std::pair{ std::string_view{}, URI<>{ "" } }; + } + auto view = std::string_view{ *ref }; + while (view.starts_with("/")) { + view.remove_prefix(1); + } + const auto serviceEnd = view.find_first_of("/?"); + if (serviceEnd == std::string_view::npos) { + return std::pair{ view, URI<>{ "" } }; + } + auto service = view.substr(0, serviceEnd); + view.remove_prefix(service.size()); + return std::pair{ service, URI<>{ std::string(view) } }; + }(); + mdp::Message req; req.protocolName = mdp::clientProtocol; req.command = command; req.serviceName = std::string(serviceName); + req.endpoint = std::move(endpoint); req.clientRequestID = IoBuffer(req_id.data(), req_id.size()); return req; @@ -247,16 +266,16 @@ class SubscriptionClient : public MDClientBase { void subscribe(const URI &uri, std::string_view /*reqId*/) override { auto &con = findConnection(uri); - const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic(); - assert(!serviceName.empty()); - opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess(); + const auto topic = mdp::SubscriptionTopic::fromServiceAndEndpointString(uri.relativeRefNoFragment().value_or("")).toZmqTopic(); + assert(!topic.empty()); + opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).assertSuccess(); } void unsubscribe(const URI &uri, std::string_view /*reqId*/) override { auto &con = findConnection(uri); - const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic(); - assert(!serviceName.empty()); - opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess(); + const auto topic = mdp::SubscriptionTopic::fromServiceAndEndpointString(uri.relativeRefNoFragment().value_or("")).toZmqTopic(); + assert(!topic.empty()); + opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).assertSuccess(); } bool disconnect(detail::Connection &con) { @@ -392,7 +411,7 @@ class MDClientCtx : public ClientBase { _requests.insert({ req_id, Request{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } }); } else if (cmd.command == mdp::Command::Subscribe) { req_id = _request_id++; - _subscriptions.insert({ mdp::SubscriptionTopic::fromURI(cmd.endpoint).toZmqTopic(), Subscription{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } }); + _subscriptions.insert({ mdp::SubscriptionTopic::fromEndpoint(cmd.endpoint).toZmqTopic(), Subscription{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } }); } else if (cmd.command == mdp::Command::Unsubscribe) { _requests.erase(0); // todo: lookup correct subscription } diff --git a/src/client/include/MockServer.hpp b/src/client/include/MockServer.hpp index 81ab8e08..6e3a9dab 100644 --- a/src/client/include/MockServer.hpp +++ b/src/client/include/MockServer.hpp @@ -91,15 +91,16 @@ class MockServer { return true; } - void notify(const mdp::SubscriptionTopic &topic, std::string_view value) { + void notify(std::string_view topic, std::string_view value) { static const auto brokerName = ""; static const auto serviceName = "a.service"; + const auto subscription = mdp::SubscriptionTopic::fromServiceAndEndpoint(serviceName, URI<>(std::string(topic))); mdp::BasicMessage notify; notify.protocolName = mdp::clientProtocol; notify.command = mdp::Command::Final; notify.serviceName = serviceName; - notify.endpoint = topic.toEndpoint(); - notify.sourceId = topic.toZmqTopic(); + notify.endpoint = subscription.toEndpoint(); + notify.sourceId = subscription.toZmqTopic(); notify.clientRequestID = IoBuffer(brokerName); notify.data = IoBuffer(value.data(), value.size()); zmq::send(std::move(notify), _pubSocket.value()).assertSuccess(); diff --git a/src/client/include/RestClientNative.hpp b/src/client/include/RestClientNative.hpp index 0e4e1fba..3800c933 100644 --- a/src/client/include/RestClientNative.hpp +++ b/src/client/include/RestClientNative.hpp @@ -229,7 +229,7 @@ class RestClient : public ClientBase { return {}; } - const auto httpError = httplib::detail::status_message(result->status); + const auto httpError = httplib::status_message(result->status); return fmt::format("{} - {}:{}", result->status, httpError, errorMsgExt.empty() ? result->body : errorMsgExt); }(); @@ -266,6 +266,7 @@ class RestClient : public ClientBase { auto endpoint = endpointBuilder.build(); auto callback = [&cmd, &preferredHeader, &endpoint](ClientType &client) { + client.set_follow_location(true); client.set_read_timeout(cmd.timeout); // default keep-alive value if (const httplib::Result &result = client.Get(endpoint.relativeRef()->data(), preferredHeader)) { returnMdpMessage(cmd, result); @@ -316,9 +317,10 @@ class RestClient : public ClientBase { ) { auto it = _subscription1.find(cmd.endpoint); if (it == _subscription1.end()) { - auto &client = _subscription1.try_emplace(cmd.endpoint, httplib::Client(cmd.endpoint.hostName().value(), cmd.endpoint.port().value())).first->second; + auto &client = _subscription1.try_emplace(cmd.endpoint, httplib::Client(cmd.endpoint.hostName().value(), cmd.endpoint.port().value())).first->second; + client.set_follow_location(true); - auto longPollingEndpoint = [&] { + auto longPollingEndpoint = [&] { if (!cmd.endpoint.queryParamMap().contains(LONG_POLLING_IDX_TAG)) { return URI<>::factory(cmd.endpoint).addQueryParameter(LONG_POLLING_IDX_TAG, "Next").build(); } else { diff --git a/src/client/test/ClientPublisher_tests.cpp b/src/client/test/ClientPublisher_tests.cpp index beaf1265..11388953 100644 --- a/src/client/test/ClientPublisher_tests.cpp +++ b/src/client/test/ClientPublisher_tests.cpp @@ -74,7 +74,7 @@ TEST_CASE("Basic subscription test", "[ClientContext]") { ClientContext clientContext{ std::move(clients) }; std::this_thread::sleep_for(100ms); // subscription - auto endpoint = URI::factory(URI(server.addressSub())).scheme("mds").path("/a.topic").addQueryParameter("C", "2").build(); + auto endpoint = URI::factory(URI(server.addressSub())).scheme("mds").path("/a.service/a.topic").addQueryParameter("C", "2").build(); std::atomic received{ 0 }; clientContext.subscribe(endpoint, [&received, &payload](const Message &update) { if (update.data.size() == payload.size()) { @@ -84,7 +84,8 @@ TEST_CASE("Basic subscription test", "[ClientContext]") { std::this_thread::sleep_for(100ms); // allow for the subscription request to be processed // send notifications for (int i = 0; i < 100; i++) { - server.notify(mdp::SubscriptionTopic::fromURI(endpoint), payload); + server.notify("/a.topic?C=2", payload); // received by client + server.notify("/a.topic?C=3", payload); // not received by client } std::this_thread::sleep_for(10ms); // allow for all the notifications to reach the client fmt::print("received notifications {}\n", received.load()); diff --git a/src/client/test/CmwClient_tests.cpp b/src/client/test/CmwClient_tests.cpp index f8a71d65..2066db0d 100644 --- a/src/client/test/CmwClient_tests.cpp +++ b/src/client/test/CmwClient_tests.cpp @@ -71,14 +71,14 @@ TEST_CASE("Basic Client Subscription Test", "[Client]") { subscriptionClient.connect(uri); subscriptionClient.housekeeping(std::chrono::system_clock::now()); - const auto endpoint = URI::UriFactory(uri).path("a.topic").build(); + const auto endpoint = URI::UriFactory(uri).path("a.service/a.topic").build(); std::string reqId = "2"; subscriptionClient.subscribe(endpoint, reqId); std::this_thread::sleep_for(50ms); // allow for subscription to be established - server.notify(SubscriptionTopic("a.topic?ctx=test_ctx1"), "101"); - server.notify(SubscriptionTopic("a.topic?ctx=test_ctx2"), "102"); + server.notify("/a.topic?ctx=test_ctx1", "101"); + server.notify("/a.topic?ctx=test_ctx2", "102"); Message resultOfNotify1; REQUIRE(subscriptionClient.receive(resultOfNotify1)); diff --git a/src/client/test/MockServerTest.cpp b/src/client/test/MockServerTest.cpp index e252770d..f1c22d47 100644 --- a/src/client/test/MockServerTest.cpp +++ b/src/client/test/MockServerTest.cpp @@ -62,11 +62,11 @@ TEST_CASE("MockServer Subscription Test", "[mock-server][lambda_handler]") { BrokerMessageNode client(context, ZMQ_SUB); REQUIRE(client.connect(opencmw::URI<>(server.addressSub()))); - client.subscribe(mdp::SubscriptionTopic("a.topic")); + client.subscribe("/a.service/a.topic"); std::this_thread::sleep_for(10ms); // wait for the subscription to be set-up. todo: investigate more clever way - server.notify(mdp::SubscriptionTopic("a.topic"), "100"); - server.notify(mdp::SubscriptionTopic("a.topic"), "23"); + server.notify("/a.topic", "100"); + server.notify("/a.topic", "23"); { auto reply = client.tryReadOne(); @@ -85,7 +85,7 @@ TEST_CASE("MockServer Subscription Test", "[mock-server][lambda_handler]") { REQUIRE(reply->endpoint.str() == "/a.topic"); } - server.notify(mdp::SubscriptionTopic("a.topic"), "10"); + server.notify("/a.topic", "10"); { auto reply = client.tryReadOne(); fmt::print("{}\n", reply.has_value()); diff --git a/src/client/test/RestClient_tests.cpp b/src/client/test/RestClient_tests.cpp index fb2aead0..9cdc5819 100644 --- a/src/client/test/RestClient_tests.cpp +++ b/src/client/test/RestClient_tests.cpp @@ -190,9 +190,7 @@ class EventDispatcher { std::unique_lock lk(_mutex); int id = _id; _condition.wait(lk, [&id, this] { return _cid == id; }); - if (sink.is_writable()) { - sink.write(_message.data(), _message.size()); - } + sink.write(_message.data(), _message.size()); } void send_event(const std::string_view &message) { diff --git a/src/core/include/SubscriptionTopic.hpp b/src/core/include/SubscriptionTopic.hpp index 7646b456..7e634a37 100644 --- a/src/core/include/SubscriptionTopic.hpp +++ b/src/core/include/SubscriptionTopic.hpp @@ -22,12 +22,12 @@ using namespace std::string_literals; // (serializable and deserializable to a string/URI). // struct SubscriptionTopic { - using map = std::unordered_map>; + using Params = std::unordered_map>; private: std::string _service; std::string _path; - map _params; + Params _params; static std::string stripSlashes(std::string_view str, bool addLeadingSlash) { auto firstNonSlash = str.find_first_not_of("/ "); @@ -35,38 +35,100 @@ struct SubscriptionTopic { return ""; } - auto lastNonSlash = str.find_last_not_of("/ "); + auto lastNonSlash = str.find_last_not_of("/ "); + const auto path = std::string(str.data() + firstNonSlash, lastNonSlash - firstNonSlash + 1); if (addLeadingSlash) { - return "/"s + std::string(str.data() + firstNonSlash, lastNonSlash - firstNonSlash + 1); + return "/"s + path; } else { - return std::string(str.data() + firstNonSlash, lastNonSlash - firstNonSlash + 1); + return path; } } public: SubscriptionTopic() = default; - template - requires(!std::same_as>) - explicit SubscriptionTopic(PathString &&path) - : SubscriptionTopic(""s, std::forward(path), {}) {} + template + SubscriptionTopic(ServiceString &&service, PathString &&path, Params params) + : _service(stripSlashes(std::forward(service), false)) + , _path(stripSlashes(std::forward(path), true)) + , _params(std::move(params)) { + if (_path.find("?") != std::string::npos) { + if (_params.size() != 0) { + throw fmt::format("Parameters are not empty, and there are more in the path {} {}\n", _params, _path); + } + auto parsed = opencmw::URI(_path); + _path = parsed.path().value_or("/"); + _params = parsed.queryParamMap(); + } - SubscriptionTopic(const SubscriptionTopic &other) = default; - SubscriptionTopic &operator=(const SubscriptionTopic &) = default; - SubscriptionTopic(SubscriptionTopic &&) noexcept = default; - SubscriptionTopic &operator=(SubscriptionTopic &&) noexcept = default; + if (_path == "/") { + _path.clear(); + } - auto operator<=>(const SubscriptionTopic &) const = default; + auto is_char_valid = [](char c) { + return std::isalnum(c) || c == '.'; + }; + + if (!std::all_of(_service.cbegin(), _service.cbegin(), is_char_valid)) { + throw fmt::format("Invalid service name {}\n", _service); + } + if (!std::all_of(_path.cbegin(), _path.cbegin(), is_char_valid)) { + throw fmt::format("Invalid path {}\n", _path); + } + } + + SubscriptionTopic(const SubscriptionTopic &other) = default; + SubscriptionTopic &operator=(const SubscriptionTopic &) = default; + SubscriptionTopic(SubscriptionTopic &&) noexcept = default; + SubscriptionTopic &operator=(SubscriptionTopic &&) noexcept = default; + + bool operator==(const SubscriptionTopic &) const = default; + + /** + * Parses subscription from a "service/topic" string + * + * @param str A string where the first path segment is the service name, e.g. "/service/topic", "service?param", or + * "/service/topic?param" + * @param params Optional query parameters, if non-empty, @p str must not contain query parameters + */ + static SubscriptionTopic fromServiceAndEndpointString(std::string_view str, Params params = {}) { + while (str.starts_with("/")) { + str.remove_prefix(1); + } + + const auto sep = str.find_first_of("/?"); + if (sep == std::string_view::npos) { + return SubscriptionTopic(str, "", std::move(params)); + } + + const auto service = str.substr(0, sep); + str.remove_prefix(service.size()); + return SubscriptionTopic(service, str, std::move(params)); + } template - static SubscriptionTopic fromURI(const TURI &uri) { - return SubscriptionTopic(""s, uri.path().value_or(""s), uri.queryParamMap()); + static SubscriptionTopic fromEndpoint(const TURI &endpoint) { + return SubscriptionTopic(""s, endpoint.path().value_or(""s), endpoint.queryParamMap()); + } + + template + static SubscriptionTopic fromServiceAndEndpoint(ServiceString &&serviceName, const TURI &endpoint) { + return SubscriptionTopic(std::forward(serviceName), endpoint.path().value_or(""s), endpoint.queryParamMap()); } - template - static SubscriptionTopic fromURIAndServiceName(const TURI &uri, ServiceString &&serviceName) { - return SubscriptionTopic(std::forward(serviceName), uri.path().value_or(""s), uri.queryParamMap()); + static SubscriptionTopic fromZmqTopic(std::string_view topic) { + if (!topic.starts_with("/")) { + return {}; + } + topic.remove_prefix(1); + const auto serviceEnd = topic.find_first_of("/?"); + if (serviceEnd == std::string_view::npos) { + return SubscriptionTopic(topic, "", {}); + } + const auto service = topic.substr(0, serviceEnd); + topic.remove_prefix(service.size()); + return SubscriptionTopic(service, topic, {}); } opencmw::URI toEndpoint() const { @@ -77,7 +139,7 @@ struct SubscriptionTopic { return _service.empty() && _path.empty() && _params.empty(); } - std::string toZmqTopic() const { + std::string topic() const { std::string topic = _path; if (_params.empty()) { return topic; @@ -98,6 +160,15 @@ struct SubscriptionTopic { return topic; } + std::string toZmqTopic() const { + std::string zmqTopic; + if (!_service.empty()) { + zmqTopic += "/" + _service; + } + + return zmqTopic + topic(); + } + [[nodiscard]] std::size_t hash() const noexcept { std::size_t seed = 0; opencmw::detail::hash_combine(seed, _service); @@ -113,33 +184,6 @@ struct SubscriptionTopic { std::string_view path() const { return _path; } std::string_view service() const { return _service; } const auto ¶ms() const { return _params; } - -private: - template - SubscriptionTopic(ServiceString &&service, PathString &&path, std::unordered_map> params) - : _service(stripSlashes(std::forward(service), false)) - , _path(stripSlashes(std::forward(path), true)) - , _params(std::move(params)) { - if (_path.find("?") != std::string::npos) { - if (_params.size() != 0) { - throw fmt::format("Parameters are not empty, and there are more in the path {} {}\n", _params, _path); - } - auto parsed = opencmw::URI(_path); - _path = parsed.path().value_or("/"); - _params = parsed.queryParamMap(); - } - - auto is_char_valid = [](char c) { - return std::isalnum(c) || c == '.'; - }; - - if (!std::all_of(_service.cbegin(), _service.cbegin(), is_char_valid)) { - throw fmt::format("Invalid service name {}\n", _service); - } - if (!std::all_of(_path.cbegin(), _path.cbegin(), is_char_valid)) { - throw fmt::format("Invalid path {}\n", _path); - } - } }; } // namespace opencmw::mdp diff --git a/src/majordomo/include/majordomo/Broker.hpp b/src/majordomo/include/majordomo/Broker.hpp index d23cb66b..67f07e6d 100644 --- a/src/majordomo/include/majordomo/Broker.hpp +++ b/src/majordomo/include/majordomo/Broker.hpp @@ -192,11 +192,10 @@ class Broker { const zmq::Socket &socket; const std::string id; const std::string serviceName; - const std::string serviceNameTopic; Timestamp expiry; explicit Worker(const zmq::Socket &s, const std::string &id_, const std::string &serviceName_, Timestamp expiry_) - : socket(s), id{ std::move(id_) }, serviceName{ std::move(serviceName_) }, serviceNameTopic(std::string("/") + serviceName), expiry{ std::move(expiry_) } {} + : socket(s), id{ std::move(id_) }, serviceName{ std::move(serviceName_) }, expiry{ std::move(expiry_) } {} }; struct Service { @@ -547,8 +546,7 @@ class Broker { } const auto topicString = data.substr(1); - const auto topicURI = URI(std::string(topicString)); - const auto topic = mdp::SubscriptionTopic::fromURI(topicURI); + const auto topic = mdp::SubscriptionTopic::fromZmqTopic(topicString); if (data[0] == '\x1') { subscribe(topic); @@ -579,7 +577,7 @@ class Broker { return true; } case mdp::Command::Subscribe: { - const auto subscription = mdp::SubscriptionTopic::fromURIAndServiceName(message.endpoint, message.serviceName); + const auto subscription = mdp::SubscriptionTopic::fromServiceAndEndpoint(message.serviceName, message.endpoint); subscribe(subscription); @@ -588,7 +586,7 @@ class Broker { return true; } case mdp::Command::Unsubscribe: { - const auto subscription = mdp::SubscriptionTopic::fromURIAndServiceName(message.endpoint, message.serviceName); + const auto subscription = mdp::SubscriptionTopic::fromServiceAndEndpoint(message.serviceName, message.endpoint); unsubscribe(subscription); @@ -676,22 +674,19 @@ class Broker { } void dispatchMessageToMatchingSubscribers(BrokerMessage &&message) { - const auto subscription = mdp::SubscriptionTopic::fromURIAndServiceName(message.endpoint, message.serviceName); + const auto notification = mdp::SubscriptionTopic::fromServiceAndEndpoint(message.serviceName, message.endpoint); // TODO avoid clone() for last message sent out for (const auto &[topic, _] : _subscribedTopics) { - if (_subscriptionMatcher(subscription, topic)) { - auto copy = message; - const auto subscriptionURI = mdp::Message::URI(std::string(subscription.path())); - copy.endpoint = subscriptionURI; - copy.sourceId = topic.toZmqTopic(); + if (_subscriptionMatcher(notification, topic)) { + auto copy = message; + copy.sourceId = topic.toZmqTopic(); zmq::send(std::move(copy), _pubSocket).assertSuccess(); const auto it = _subscribedClientsByTopic.find(topic); if (it != _subscribedClientsByTopic.end()) { for (const auto &clientId : it->second) { auto clientCopy = message; - clientCopy.endpoint = subscriptionURI; clientCopy.sourceId = clientId; zmq::send(std::move(clientCopy), _routerSocket).assertSuccess(); } @@ -899,7 +894,6 @@ class Broker { disconnect.command = mdp::Command::Disconnect; disconnect.sourceId = worker.id; disconnect.serviceName = worker.serviceName; - disconnect.endpoint = mdp::Message::URI(worker.serviceNameTopic); disconnect.data = IoBuffer("broker shutdown"); disconnect.rbac = _rbac; zmq::send(std::move(disconnect), worker.socket).assertSuccess(); diff --git a/src/majordomo/include/majordomo/RestBackend.hpp b/src/majordomo/include/majordomo/RestBackend.hpp index b353984d..c5079855 100644 --- a/src/majordomo/include/majordomo/RestBackend.hpp +++ b/src/majordomo/include/majordomo/RestBackend.hpp @@ -26,6 +26,7 @@ #include #include +#include #include // Majordomo @@ -215,7 +216,6 @@ struct SubscriptionInfo { std::string serviceName; std::string topicName; auto operator<=>(const SubscriptionInfo &other) const = default; - bool operator==(const SubscriptionInfo &other) const = default; }; struct Connection { @@ -465,19 +465,32 @@ class RestBackend : public Mode { virtual ~RestBackend() { _svr.stop(); + // shutdown thread before _connectionForService is destroyed + _connectionUpdaterThread.request_stop(); + _connectionUpdaterThread.join(); } auto handleServiceRequest(const httplib::Request &request, httplib::Response &response, const httplib::ContentReader *content_reader_ = nullptr) { using detail::RestMethod; - auto service = [&] { - if (!request.path.empty() && request.path[0] == '/') { - return std::string_view(request.path.begin() + 1, request.path.end()); - } else { - return std::string_view(request.path); + + auto convertParams = [](const httplib::Params ¶ms) { + mdp::SubscriptionTopic::Params r; + for (const auto &[key, value] : params) { + if (key == "LongPollingIdx" || key == "SubscriptionContext") { + continue; + } + if (value.empty()) { + r[key] = std::nullopt; + } else { + r[key] = value; + } } - }(); + return r; + }; + + auto subscription = mdp::SubscriptionTopic::fromServiceAndEndpointString(request.path, convertParams(request.params)); - if (service.empty()) { + if (subscription.service().empty()) { return detail::respondWithError(response, "Error: Service not specified\n"); } @@ -497,8 +510,6 @@ class RestBackend : public Mode { // clang-format on }(); - std::string subscriptionContext; - for (const auto &[key, value] : request.params) { if (key == "LongPollingIdx") { // This parameter is not passed on, it just means we @@ -506,7 +517,7 @@ class RestBackend : public Mode { restMethod = value == "Subscription" ? RestMethod::Subscribe : RestMethod::LongPoll; } else if (key == "SubscriptionContext") { - subscriptionContext = value; + subscription = mdp::SubscriptionTopic(subscription.service(), value, {}); // params are parsed from value } } @@ -519,13 +530,13 @@ class RestBackend : public Mode { switch (restMethod) { case RestMethod::Get: case RestMethod::Post: - return worker.respondWithPubSub(request, response, service, restMethod, content_reader_); + return worker.respondWithPubSub(request, response, subscription, restMethod, content_reader_); case RestMethod::LongPoll: - return worker.respondWithLongPoll(request, response, service, subscriptionContext); + return worker.respondWithLongPoll(request, response, subscription); case RestMethod::Subscribe: - return worker.respondWithSubscription(response, service, subscriptionContext); + return worker.respondWithSubscription(response, subscription); default: // std::unreachable() is C++23 @@ -608,7 +619,7 @@ struct RestBackend::RestWorker { return std::move(connection).unsafeMove(); } - bool respondWithPubSub(const httplib::Request &request, httplib::Response &response, const std::string_view &service, detail::RestMethod restMethod, const httplib::ContentReader *content_reader_ = nullptr) { + bool respondWithPubSub(const httplib::Request &request, httplib::Response &response, mdp::SubscriptionTopic subscription, detail::RestMethod restMethod, const httplib::ContentReader *content_reader_ = nullptr) { // clang-format off const mdp::Command mdpMessageCommand = restMethod == detail::RestMethod::Post ? mdp::Command::Set : @@ -643,14 +654,13 @@ struct RestBackend::RestWorker { mdp::Message message; message.protocolName = mdp::clientProtocol; message.command = mdpMessageCommand; - message.serviceName = std::string(service); const auto acceptedFormat = detail::acceptedMimeForRequest(request); - uri = std::move(uri).addQueryParameter("contentType", std::string(acceptedFormat)); - - auto topic = std::string(service) + uri.toString(); - - message.endpoint = mdp::Message::URI(topic); + auto params = subscription.params(); + params["contentType"] = std::string(acceptedFormat); + subscription = mdp::SubscriptionTopic(subscription.service(), subscription.path(), params); + message.serviceName = std::string(subscription.service()); + message.endpoint = subscription.toEndpoint(); if (request.is_multipart_form_data()) { if (content_reader_ != nullptr) { @@ -725,8 +735,8 @@ struct RestBackend::RestWorker { return true; } - bool respondWithSubscription(httplib::Response &response, const std::string_view &service, const std::string_view &topic) { - detail::SubscriptionInfo subscriptionInfo{ std::string(service), std::string(topic) }; + bool respondWithSubscription(httplib::Response &response, const mdp::SubscriptionTopic &subscription) { + detail::SubscriptionInfo subscriptionInfo{ std::string(subscription.service()), subscription.topic() }; auto *connection = restBackend.notificationSubscriptionConnectionFor(subscriptionInfo); assert(connection); @@ -755,11 +765,11 @@ struct RestBackend::RestWorker { return true; } - bool respondWithLongPollRedirect(const httplib::Request &request, httplib::Response &response, const std::string_view &subscriptionContext, detail::PollingIndex redirectLongPollingIdx) { + bool respondWithLongPollRedirect(const httplib::Request &request, httplib::Response &response, const mdp::SubscriptionTopic &subscription, detail::PollingIndex redirectLongPollingIdx) { auto uri = URI<>::factory() .path(request.path) .addQueryParameter("LongPollingIdx", std::to_string(redirectLongPollingIdx)) - .addQueryParameter("SubscriptionContext", std::string(subscriptionContext)); + .addQueryParameter("SubscriptionContext", subscription.topic()); // copy over the original query parameters addParameters(request, uri); @@ -769,12 +779,12 @@ struct RestBackend::RestWorker { return true; } - bool respondWithLongPoll(const httplib::Request &request, httplib::Response &response, const std::string_view &service, const std::string_view &topic) { + bool respondWithLongPoll(const httplib::Request &request, httplib::Response &response, const mdp::SubscriptionTopic &subscription) { // TODO: After the URIs are formalized, rethink service and topic auto uri = URI<>::factory(); addParameters(request, uri); - detail::SubscriptionInfo subscriptionInfo{ std::string(service), std::string(topic) }; + detail::SubscriptionInfo subscriptionInfo{ std::string(subscription.service()), subscription.topic() }; const auto longPollingIdxIt = request.params.find("LongPollingIdx"); if (longPollingIdxIt == request.params.end()) { @@ -818,19 +828,19 @@ struct RestBackend::RestWorker { response.set_header("Access-Control-Allow-Origin", "*"); if (longPollingIdxParam == "Next") { - return respondWithLongPollRedirect(request, response, topic, cache.nextPollingIndex); + return respondWithLongPollRedirect(request, response, subscription, cache.nextPollingIndex); } if (longPollingIdxParam == "Last") { if (cache.connection != nullptr) { - return respondWithLongPollRedirect(request, response, topic, cache.nextPollingIndex - 1); + return respondWithLongPollRedirect(request, response, subscription, cache.nextPollingIndex - 1); } else { - return respondWithLongPollRedirect(request, response, topic, cache.nextPollingIndex); + return respondWithLongPollRedirect(request, response, subscription, cache.nextPollingIndex); } } if (longPollingIdxParam == "FirstAvailable") { - return respondWithLongPollRedirect(request, response, topic, cache.firstCachedIndex); + return respondWithLongPollRedirect(request, response, subscription, cache.firstCachedIndex); } if (std::from_chars(longPollingIdxParam.data(), longPollingIdxParam.data() + longPollingIdxParam.size(), requestedLongPollingIdx).ec != std::errc{}) { diff --git a/src/majordomo/include/majordomo/Worker.hpp b/src/majordomo/include/majordomo/Worker.hpp index 759fc430..b5a4728d 100644 --- a/src/majordomo/include/majordomo/Worker.hpp +++ b/src/majordomo/include/majordomo/Worker.hpp @@ -60,7 +60,10 @@ using description = worker_detail::description_impl class BasicWorker { + // serviceName must be a non-empty string without slashes + // TODO clarify allowed format and enforce it static_assert(!serviceName.empty()); + static_assert(std::string_view(serviceName.data()).find("/") == std::string_view::npos); using Clock = std::chrono::steady_clock; using Timestamp = std::chrono::time_point; @@ -256,8 +259,10 @@ class BasicWorker { } const auto topicString = data.substr(1); - const auto topicUrl = URI(std::string(topicString)); - const auto subscription = mdp::SubscriptionTopic::fromURIAndServiceName(topicUrl, serviceName.data()); + const auto subscription = mdp::SubscriptionTopic::fromZmqTopic(topicString); + if (subscription.service() != std::string_view(serviceName.data())) { + return true; + } // this assumes that the broker does the subscribe/unsubscribe counting // for multiple clients and sends us a single sub/unsub for each topic @@ -274,7 +279,7 @@ class BasicWorker { bool receiveNotificationMessage() { if (auto message = zmq::receive(_notifyListenerSocket)) { - const auto currentSubscription = mdp::SubscriptionTopic::fromURIAndServiceName(message->endpoint, message->serviceName); + const auto currentSubscription = mdp::SubscriptionTopic::fromServiceAndEndpoint(message->serviceName, message->endpoint); const auto matchesNotificationTopic = [this, ¤tSubscription](const auto &activeSubscription) { return _subscriptionMatcher(currentSubscription, activeSubscription); @@ -454,9 +459,7 @@ inline void serialiseAndWriteToBody(RequestContext &rawCtx, const ReflectableCla inline void writeResult(std::string_view workerName, RequestContext &rawCtx, const auto &replyContext, const auto &output) { auto replyQuery = query::serialise(replyContext); const auto baseUri = rawCtx.reply.endpoint.empty() ? rawCtx.request.endpoint : rawCtx.reply.endpoint; - const auto topicUriOld = mdp::Message::URI::factory(baseUri).setQuery(std::move(replyQuery)).build(); - const auto topicUriNew = mdp::Message::URI::factory(baseUri).build(); - const auto &topicUri = topicUriOld; + const auto topicUri = mdp::Message::URI::factory(baseUri).setQuery(std::move(replyQuery)).build(); rawCtx.reply.endpoint = topicUri; const auto replyMimetype = query::getMimeType(replyContext); @@ -585,9 +588,14 @@ class Worker : public BasicWorker { return notify("/", context, reply); } - bool notify(std::string_view path, const ContextType &context, const OutputType &reply) { + bool notify(std::string_view path_, const ContextType &context, const OutputType &reply) { RequestContext rawCtx; - rawCtx.reply.endpoint = mdp::Message::URI(std::string(path)); + std::string path = [&path_] { + if (path_.starts_with("/")) + return std::string{ path_ }; + return "/" + std::string{ path_ }; + }(); + rawCtx.reply.endpoint = URI<>(path); worker_detail::writeResult(Worker::name, rawCtx, context, reply); return BasicWorker::notify(std::move(rawCtx.reply)); } diff --git a/src/majordomo/test/majordomo_tests.cpp b/src/majordomo/test/majordomo_tests.cpp index 59c5e744..86478741 100644 --- a/src/majordomo/test/majordomo_tests.cpp +++ b/src/majordomo/test/majordomo_tests.cpp @@ -428,7 +428,6 @@ TEST_CASE("One client/one worker roundtrip", "[broker][roundtrip]") { REQUIRE(disconnect->command == mdp::Command::Disconnect); REQUIRE(disconnect->serviceName == "a.service"); REQUIRE(disconnect->clientRequestID.empty()); - REQUIRE(disconnect->endpoint.str() == "/a.service"); REQUIRE(disconnect->data.asString() == "broker shutdown"); REQUIRE(disconnect->error.empty()); REQUIRE(disconnect->rbac.asString() == "RBAC=ADMIN,abcdef12345"); @@ -446,8 +445,8 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { REQUIRE(client.connect(opencmw::majordomo::INTERNAL_ADDRESS_BROKER)); auto ready = createWorkerMessage(mdp::Command::Ready); - ready.serviceName = "/DeviceA/dashboard"; - ready.data = IoBuffer("An example worker serving different dashbards"); + ready.serviceName = "dashboard"; + ready.data = IoBuffer("An example worker serving different dashboards"); ready.rbac = IoBuffer("rbacToken"); worker.send(std::move(ready)); @@ -455,9 +454,9 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { { auto request = createClientMessage(mdp::Command::Get); - request.serviceName = "/DeviceA/dashboard"; + request.serviceName = "dashboard"; request.clientRequestID = IoBuffer("1"); - request.endpoint = mdp::Message::URI("/DeviceA/dashboard"); + request.endpoint = mdp::Message::URI("/dashboard"); request.rbac = IoBuffer("rbacToken"); client.send(std::move(request)); @@ -469,7 +468,7 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { REQUIRE(requestAtWorker->command == mdp::Command::Get); REQUIRE(!requestAtWorker->serviceName.empty()); // clientSourceID REQUIRE(requestAtWorker->clientRequestID.asString() == "1"); - REQUIRE(requestAtWorker->endpoint.str() == "/DeviceA/dashboard"); + REQUIRE(requestAtWorker->endpoint.str() == "/dashboard"); REQUIRE(requestAtWorker->data.empty()); REQUIRE(requestAtWorker->error.empty()); REQUIRE(requestAtWorker->rbac.asString() == "rbacToken"); @@ -477,7 +476,7 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { auto replyFromWorker = createWorkerMessage(mdp::Command::Final); replyFromWorker.serviceName = requestAtWorker->serviceName; // clientSourceID replyFromWorker.clientRequestID = IoBuffer("1"); - replyFromWorker.endpoint = mdp::Message::URI("/DeviceA/dashboard/default"); + replyFromWorker.endpoint = mdp::Message::URI("/dashboard/default"); replyFromWorker.data = IoBuffer("Testreply"); replyFromWorker.rbac = IoBuffer("rbac_worker"); worker.send(std::move(replyFromWorker)); @@ -488,9 +487,9 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { REQUIRE(reply.has_value()); REQUIRE(reply->protocolName == mdp::clientProtocol); REQUIRE(reply->command == mdp::Command::Final); - REQUIRE(reply->serviceName == "/DeviceA/dashboard"); + REQUIRE(reply->serviceName == "dashboard"); REQUIRE(reply->clientRequestID.asString() == "1"); - REQUIRE(reply->endpoint.str() == "/DeviceA/dashboard/default"); + REQUIRE(reply->endpoint.str() == "/dashboard/default"); REQUIRE(reply->data.asString() == "Testreply"); REQUIRE(reply->error.empty()); REQUIRE(reply->rbac.asString() == "rbac_worker"); @@ -498,9 +497,9 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { { auto request = createClientMessage(mdp::Command::Get); - request.serviceName = "/DeviceA/dashboard/main"; + request.serviceName = "dashboard/main"; request.clientRequestID = IoBuffer("2"); - request.endpoint = mdp::Message::URI("/DeviceA/dashboard/main?revision=12"); + request.endpoint = mdp::Message::URI("/dashboard/main?revision=12"); request.rbac = IoBuffer("rbacToken"); client.send(std::move(request)); @@ -512,7 +511,7 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { REQUIRE(requestAtWorker->command == mdp::Command::Get); REQUIRE(!requestAtWorker->serviceName.empty()); // clientSourceID REQUIRE(requestAtWorker->clientRequestID.asString() == "2"); - REQUIRE(requestAtWorker->endpoint.str() == "/DeviceA/dashboard/main?revision=12"); + REQUIRE(requestAtWorker->endpoint.str() == "/dashboard/main?revision=12"); REQUIRE(requestAtWorker->data.empty()); REQUIRE(requestAtWorker->error.empty()); REQUIRE(requestAtWorker->rbac.asString() == "rbacToken"); @@ -520,7 +519,7 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { auto replyFromWorker = createWorkerMessage(mdp::Command::Final); replyFromWorker.serviceName = requestAtWorker->serviceName; // clientSourceID replyFromWorker.clientRequestID = IoBuffer("2"); - replyFromWorker.endpoint = mdp::Message::URI("/DeviceA/dashboard/main?revision=12"); + replyFromWorker.endpoint = mdp::Message::URI("/dashboard/main?revision=12"); replyFromWorker.data = IoBuffer("Testreply"); replyFromWorker.rbac = IoBuffer("rbac_worker"); worker.send(std::move(replyFromWorker)); @@ -531,9 +530,9 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { REQUIRE(reply.has_value()); REQUIRE(reply->protocolName == mdp::clientProtocol); REQUIRE(reply->command == mdp::Command::Final); - REQUIRE(reply->serviceName == "/DeviceA/dashboard"); + REQUIRE(reply->serviceName == "dashboard"); REQUIRE(reply->clientRequestID.asString() == "2"); - REQUIRE(reply->endpoint.str() == "/DeviceA/dashboard/main?revision=12"); + REQUIRE(reply->endpoint.str() == "/dashboard/main?revision=12"); REQUIRE(reply->data.asString() == "Testreply"); REQUIRE(reply->error.empty()); REQUIRE(reply->rbac.asString() == "rbac_worker"); @@ -546,9 +545,8 @@ TEST_CASE("Test service matching", "[broker][name-matcher]") { REQUIRE(disconnect.has_value()); REQUIRE(disconnect->protocolName == mdp::workerProtocol); REQUIRE(disconnect->command == mdp::Command::Disconnect); - REQUIRE(disconnect->serviceName == "/DeviceA/dashboard"); + REQUIRE(disconnect->serviceName == "dashboard"); REQUIRE(disconnect->clientRequestID.empty()); - REQUIRE(disconnect->endpoint.str() == "//DeviceA/dashboard"); REQUIRE(disconnect->data.asString() == "broker shutdown"); REQUIRE(disconnect->error.empty()); REQUIRE(disconnect->rbac.asString() == "RBAC=ADMIN,abcdef12345"); @@ -565,8 +563,8 @@ TEST_CASE("Pubsub example using SUB client/DEALER worker", "[broker][pubsub_sub_ REQUIRE(broker.bind(publisherAddress, BindOption::Pub)); BrokerMessageNode subscriber(broker.context, ZMQ_SUB); - REQUIRE(subscriber.connect(publisherAddress, mdp::SubscriptionTopic("/a.topic"))); - REQUIRE(subscriber.subscribe(mdp::SubscriptionTopic("/other.*"))); + REQUIRE(subscriber.connect(publisherAddress, "/a.service/a.topic")); + REQUIRE(subscriber.subscribe("/a.service/other.*")); broker.processMessages(); @@ -613,7 +611,7 @@ TEST_CASE("Pubsub example using SUB client/DEALER worker", "[broker][pubsub_sub_ const auto reply = subscriber.tryReadOne(); REQUIRE(reply.has_value()); REQUIRE(reply->protocolName == mdp::clientProtocol); - REQUIRE(reply->sourceId == "/a.topic"); + REQUIRE(reply->sourceId == "/a.service/a.topic"); REQUIRE(reply->serviceName == "a.service"); REQUIRE(reply->clientRequestID.empty()); REQUIRE(reply->data.asString() == "Notification about /a.topic"); @@ -625,7 +623,7 @@ TEST_CASE("Pubsub example using SUB client/DEALER worker", "[broker][pubsub_sub_ const auto reply = subscriber.tryReadOne(); REQUIRE(reply.has_value()); REQUIRE(reply->protocolName == mdp::clientProtocol); - REQUIRE(reply->sourceId == "/other.*"); + REQUIRE(reply->sourceId == "/a.service/other.*"); REQUIRE(reply->serviceName == "a.service"); REQUIRE(reply->clientRequestID.empty()); REQUIRE(reply->data.asString() == "Notification about /other.topic"); @@ -949,11 +947,11 @@ TEST_CASE("pubsub example using PUB socket (SUB client)", "[broker][pubsub_subcl MessageNode publisherTwo(broker.context); REQUIRE(publisherTwo.connect(opencmw::majordomo::INTERNAL_ADDRESS_BROKER)); - subscriber.subscribe(mdp::SubscriptionTopic("/cooking.italian*")); + subscriber.subscribe("/first.service/cooking.italian*"); broker.processMessages(); - subscriber.subscribe(mdp::SubscriptionTopic("/cooking.indian*")); + subscriber.subscribe("/second.service/cooking.indian*"); broker.processMessages(); @@ -974,7 +972,7 @@ TEST_CASE("pubsub example using PUB socket (SUB client)", "[broker][pubsub_subcl const auto reply = subscriber.tryReadOne(); REQUIRE(reply.has_value()); REQUIRE(reply->protocolName == mdp::clientProtocol); - REQUIRE(reply->sourceId == "/cooking.italian*"); + REQUIRE(reply->sourceId == "/first.service/cooking.italian*"); REQUIRE(reply->command == mdp::Command::Final); REQUIRE(reply->serviceName == "first.service"); REQUIRE(reply->clientRequestID.empty()); @@ -1001,7 +999,7 @@ TEST_CASE("pubsub example using PUB socket (SUB client)", "[broker][pubsub_subcl const auto reply = subscriber.tryReadOne(); REQUIRE(reply.has_value()); REQUIRE(reply->protocolName == mdp::clientProtocol); - REQUIRE(reply->sourceId == "/cooking.indian*"); + REQUIRE(reply->sourceId == "/second.service/cooking.indian*"); REQUIRE(reply->command == mdp::Command::Final); REQUIRE(reply->serviceName == "second.service"); REQUIRE(reply->clientRequestID.empty()); @@ -1011,7 +1009,7 @@ TEST_CASE("pubsub example using PUB socket (SUB client)", "[broker][pubsub_subcl REQUIRE(reply->rbac.asString() == "rbac_worker_2"); } - subscriber.unsubscribe(mdp::SubscriptionTopic("/cooking.italian*")); + subscriber.unsubscribe("/first.service/cooking.italian*"); broker.processMessages(); @@ -1045,7 +1043,7 @@ TEST_CASE("pubsub example using PUB socket (SUB client)", "[broker][pubsub_subcl const auto reply = subscriber.tryReadOne(); REQUIRE(reply.has_value()); REQUIRE(reply->protocolName == mdp::clientProtocol); - REQUIRE(reply->sourceId == "/cooking.indian*"); + REQUIRE(reply->sourceId == "/second.service/cooking.indian*"); REQUIRE(reply->command == mdp::Command::Final); REQUIRE(reply->serviceName == "second.service"); REQUIRE(reply->clientRequestID.empty()); @@ -1236,13 +1234,13 @@ TEST_CASE("BasicWorker SET/GET example with RBAC permission handling", "[worker] using opencmw::majordomo::description; Broker broker("testbroker", testSettings()); - BasicWorker<"/a.service", description<"API description">, rbac> worker(broker, TestIntHandler(10)); + BasicWorker<"a.service", description<"API description">, rbac> worker(broker, TestIntHandler(10)); REQUIRE(worker.serviceDescription() == "API description"); RunInThread brokerRun(broker); RunInThread workerRun(worker); - REQUIRE(waitUntilServiceAvailable(broker.context, "/a.service")); + REQUIRE(waitUntilServiceAvailable(broker.context, "a.service")); MessageNode writer(broker.context); REQUIRE(writer.connect(opencmw::majordomo::INTERNAL_ADDRESS_BROKER)); @@ -1250,7 +1248,7 @@ TEST_CASE("BasicWorker SET/GET example with RBAC permission handling", "[worker] // writer is allowed to SET { auto set = createClientMessage(mdp::Command::Set); - set.serviceName = "/a.service"; + set.serviceName = "a.service"; set.clientRequestID = IoBuffer("1"); set.endpoint = mdp::Message::URI("/topic"); set.data = IoBuffer("42"); @@ -1268,7 +1266,7 @@ TEST_CASE("BasicWorker SET/GET example with RBAC permission handling", "[worker] // writer is not allowed to GET { auto get = createClientMessage(mdp::Command::Get); - get.serviceName = "/a.service"; + get.serviceName = "a.service"; get.clientRequestID = IoBuffer("2"); get.endpoint = mdp::Message::URI("/topic"); get.rbac = IoBuffer("RBAC=WRITER,1234"); @@ -1288,7 +1286,7 @@ TEST_CASE("BasicWorker SET/GET example with RBAC permission handling", "[worker] // reader is not allowed to SET { auto set = createClientMessage(mdp::Command::Set); - set.serviceName = "/a.service"; + set.serviceName = "a.service"; set.clientRequestID = IoBuffer("1"); set.endpoint = mdp::Message::URI("/topic"); set.data = IoBuffer("42"); @@ -1306,7 +1304,7 @@ TEST_CASE("BasicWorker SET/GET example with RBAC permission handling", "[worker] // reader is allowed to GET { auto get = createClientMessage(mdp::Command::Get); - get.serviceName = "/a.service"; + get.serviceName = "a.service"; get.clientRequestID = IoBuffer("2"); get.endpoint = mdp::Message::URI("/topic"); get.rbac = IoBuffer("RBAC=READER,1234"); @@ -1326,7 +1324,7 @@ TEST_CASE("BasicWorker SET/GET example with RBAC permission handling", "[worker] // admin is allowed to SET { auto set = createClientMessage(mdp::Command::Set); - set.serviceName = "/a.service"; + set.serviceName = "a.service"; set.clientRequestID = IoBuffer("1"); set.endpoint = mdp::Message::URI("/topic"); set.data = IoBuffer("42"); @@ -1344,7 +1342,7 @@ TEST_CASE("BasicWorker SET/GET example with RBAC permission handling", "[worker] // admin is allowed to GET { auto get = createClientMessage(mdp::Command::Get); - get.serviceName = "/a.service"; + get.serviceName = "a.service"; get.clientRequestID = IoBuffer("2"); get.endpoint = mdp::Message::URI("/topic"); get.rbac = IoBuffer("RBAC=ADMIN,1234"); @@ -1378,9 +1376,9 @@ TEST_CASE("NOTIFY example using the BasicWorker class", "[worker][notify_basic_w REQUIRE(client.sendRawFrame("\x1")); REQUIRE(client.sendRawFrame("\x0"s)); - // subscribe to /wine* and /beer* - REQUIRE(client.sendRawFrame("\x1/wine*")); - REQUIRE(client.sendRawFrame("\x1/beer*")); + // subscribe to /beverages/wine* and /beverages/beer* + REQUIRE(client.sendRawFrame("\x1/beverages/wine*")); + REQUIRE(client.sendRawFrame("\x1/beverages/beer*")); bool seenNotification = false; @@ -1400,7 +1398,7 @@ TEST_CASE("NOTIFY example using the BasicWorker class", "[worker][notify_basic_w seenNotification = true; REQUIRE(notification->protocolName == mdp::clientProtocol); REQUIRE(notification->command == mdp::Command::Final); - REQUIRE(notification->sourceId == "/beer*"); + REQUIRE(notification->sourceId == "/beverages/beer*"); REQUIRE(notification->endpoint.str() == "/beer.time"); REQUIRE(notification->data.asString() == "Have a beer"); } @@ -1427,7 +1425,7 @@ TEST_CASE("NOTIFY example using the BasicWorker class", "[worker][notify_basic_w REQUIRE(notification->protocolName == mdp::clientProtocol); REQUIRE(notification->command == mdp::Command::Final); - REQUIRE(notification->sourceId == "/beer*"); + REQUIRE(notification->sourceId == "/beverages/beer*"); REQUIRE(notification->endpoint.str() == "/beer.error"); REQUIRE(notification->error == "Fridge empty!"); seenError = true; @@ -1447,13 +1445,13 @@ TEST_CASE("NOTIFY example using the BasicWorker class", "[worker][notify_basic_w REQUIRE(notification.has_value()); REQUIRE(notification->protocolName == mdp::clientProtocol); REQUIRE(notification->command == mdp::Command::Final); - REQUIRE(notification->sourceId == "/wine*"); + REQUIRE(notification->sourceId == "/beverages/wine*"); REQUIRE(notification->endpoint.str() == "/wine.italian"); REQUIRE(notification->data.asString() == "Try our Chianti!"); } // unsubscribe from /beer* - REQUIRE(client.sendRawFrame("\x0/beer*"s)); + REQUIRE(client.sendRawFrame("\x0/beverages/beer*"s)); // loop until we get two consecutive messages about wine, it means that the beer unsubscribe was processed while (true) { @@ -1478,19 +1476,19 @@ TEST_CASE("NOTIFY example using the BasicWorker class", "[worker][notify_basic_w const auto msg1 = client.tryReadOne(); REQUIRE(msg1.has_value()); - REQUIRE(msg1->sourceId == "/wine*"); + REQUIRE(msg1->sourceId == "/beverages/wine*"); const auto msg2 = client.tryReadOne(); REQUIRE(msg2.has_value()); - if (msg2->sourceId == "/wine*") { + if (msg2->sourceId == "/beverages/wine*") { break; } - REQUIRE(msg2->sourceId == "/beer*"); + REQUIRE(msg2->sourceId == "/beverages/beer*"); const auto msg3 = client.tryReadOne(); REQUIRE(msg3.has_value()); - REQUIRE(msg3->sourceId == "/wine*"); + REQUIRE(msg3->sourceId == "/beverages/wine*"); } } diff --git a/src/majordomo/test/majordomoworker_rest_tests.cpp b/src/majordomo/test/majordomoworker_rest_tests.cpp index e71d793e..de6bb6ff 100644 --- a/src/majordomo/test/majordomoworker_rest_tests.cpp +++ b/src/majordomo/test/majordomoworker_rest_tests.cpp @@ -12,24 +12,30 @@ #include #include +#include #include #include // Concepts and tests use common types #include -std::jthread makeGetRequestResponseCheckerThread(const std::string &address, const std::string &requiredResponse, [[maybe_unused]] std::source_location location = std::source_location::current()) { +std::jthread makeGetRequestResponseCheckerThread(const std::string &address, const std::vector &requiredResponses, [[maybe_unused]] std::source_location location = std::source_location::current()) { return std::jthread([=] { httplib::Client http("localhost", majordomo::DEFAULT_REST_PORT); + http.set_follow_location(true); http.set_keep_alive(true); - const auto response = http.Get(address.data()); - #define requireWithSource(arg) \ if (!(arg)) opencmw::zmq::debug::withLocation(location) << "<- call got a failed requirement:"; \ REQUIRE(arg) - requireWithSource(response); - requireWithSource(response->status == 200); - requireWithSource(response->body.find(requiredResponse) != std::string::npos); + for (const auto &requiredResponse : requiredResponses) { + const auto response = http.Get(address); + requireWithSource(response); + if (response->status != 200) { + fmt::println(std::cerr, "Unexpected error: {}", response->body); + } + requireWithSource(response->status == 200); + requireWithSource(response->body.find(requiredResponse) != std::string::npos); + } #undef requireWithSource }); } @@ -59,16 +65,16 @@ TEST_CASE("Simple MajordomoWorker example showing its usage", "[majordomo][major REQUIRE(waitUntilServiceAvailable(broker.context, "addressbook")); SECTION("request Address information as JSON and as HTML") { - auto httpThreadJSON = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=application%2Fjavascript", "Santa Claus"); + auto httpThreadJSON = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=application%2Fjavascript", { "Santa Claus" }); - auto httpThreadHTML = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=text%2Fhtml", "Elf Road"); + auto httpThreadHTML = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=text%2Fhtml", { "Elf Road" }); } SECTION("post data") { httplib::Client postData{ "http://localhost:8080" }; postData.Post("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=application%json", "{\"streetNumber\": 1882}", "application/json"); - auto httpThreadJSON = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=application%json", "1882"); + auto httpThreadJSON = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=application%json", { "1882" }); } SECTION("post data as multipart") { @@ -95,8 +101,83 @@ TEST_CASE("Simple MajordomoWorker example showing its usage", "[majordomo][major CAPTURE(r->body); REQUIRE(r->status == 200); - auto httpThreadJSON = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=application%2Fjavascript", "Kalle"); + auto httpThreadJSON = makeGetRequestResponseCheckerThread("/addressbook?ctx=FAIR.SELECTOR.ALL&contentType=application%2Fjavascript", { "Kalle" }); } }; } } + +struct ColorContext { + bool red = false; + bool green = false; + bool blue = false; + opencmw::MIME::MimeType contentType = opencmw::MIME::JSON; +}; + +ENABLE_REFLECTION_FOR(ColorContext, red, green, blue, contentType) + +struct SingleString { + std::string value; +}; +ENABLE_REFLECTION_FOR(SingleString, value) + +template +class ColorWorker : public majordomo::Worker { + std::jthread notifyThread; + +public: + using super_t = majordomo::Worker; + + template + explicit ColorWorker(const BrokerType &broker, std::vector notificationContexts) + : super_t(broker, {}) { + notifyThread = std::jthread([this, contexts = std::move(notificationContexts)]() { + int counter = 0; + for (const auto &context : contexts) { + std::this_thread::sleep_for(150ms); + super_t::notify(context, { std::to_string(counter) }); + counter++; + } + }); + } +}; + +TEST_CASE("Subscriptions", "[majordomo][majordomoworker][subscription]") { + majordomo::Broker broker("TestBroker", testSettings()); + auto fs = cmrc::assets::get_filesystem(); + FileServerRestBackend rest(broker, fs); + RunInThread restServerRun(rest); + + opencmw::query::registerTypes(ColorContext(), broker); + + constexpr auto red = ColorContext{ .red = true }; + constexpr auto green = ColorContext{ .green = true }; + constexpr auto blue = ColorContext{ .blue = true }; + constexpr auto magenta = ColorContext{ .red = true, .blue = true }; + constexpr auto yellow = ColorContext{ .red = true, .green = true }; + constexpr auto black = ColorContext{}; + constexpr auto white = ColorContext{ .red = true, .green = true, .blue = true }; + + ColorWorker<"notifications"> worker(broker, { red, green, blue, magenta, yellow, black, white }); + + RunInThread brokerRun(broker); + RunInThread workerRun(worker); + + REQUIRE(waitUntilServiceAvailable(broker.context, "notifications")); + + auto allListener = makeGetRequestResponseCheckerThread("/notifications?LongPollingIdx=Next", { "0", "1", "2", "3", "4", "5", "6" }); + auto redListener = makeGetRequestResponseCheckerThread("/notifications?LongPollingIdx=Next&red", { "0", "3", "4", "6" }); + auto yellowListener = makeGetRequestResponseCheckerThread("/notifications?LongPollingIdx=Next&red&green", { "4", "6" }); + auto whiteListener1 = makeGetRequestResponseCheckerThread("/notifications?LongPollingIdx=Next&red&green&blue", { "6" }); + auto whiteListener2 = makeGetRequestResponseCheckerThread("/notifications?LongPollingIdx=Next&green&red&blue", { "6" }); + auto whiteListener3 = makeGetRequestResponseCheckerThread("/notifications?LongPollingIdx=Next&blue&green&red", { "6" }); + + std::this_thread::sleep_for(50ms); // give time for subscriptions to happen + + std::vector subscriptions; + for (const auto &subscription : worker.activeSubscriptions()) { + subscriptions.push_back(subscription.toZmqTopic()); + } + std::ranges::sort(subscriptions); + REQUIRE(subscriptions == std::vector{ "/notifications", "/notifications?blue&green&red", "/notifications?green&red", "/notifications?red" }); +} diff --git a/src/majordomo/test/majordomoworker_tests.cpp b/src/majordomo/test/majordomoworker_tests.cpp index 83a0735f..e60d4db8 100644 --- a/src/majordomo/test/majordomoworker_tests.cpp +++ b/src/majordomo/test/majordomoworker_tests.cpp @@ -185,7 +185,7 @@ TEST_CASE("MajordomoWorker test using raw messages", "[majordomo][majordomoworke BrokerMessageNode subClient(broker.context, ZMQ_SUB); REQUIRE(client.connect(opencmw::majordomo::INTERNAL_ADDRESS_BROKER)); REQUIRE(subClient.connect(opencmw::majordomo::INTERNAL_ADDRESS_PUBLISHER)); - REQUIRE(subClient.subscribe(mdp::SubscriptionTopic("/newAddress?ctx=FAIR.SELECTOR.C%3D1"))); + REQUIRE(subClient.subscribe("/addressbook/newAddress?ctx=FAIR.SELECTOR.C%3D1")); { auto request = createClientMessage(mdp::Command::Get); @@ -313,15 +313,15 @@ TEST_CASE("MajordomoWorker test using raw messages", "[majordomo][majordomoworke REQUIRE(worker.activeSubscriptions().size() == 1); REQUIRE(worker.activeSubscriptions().begin()->service() == "addressbook"); REQUIRE(worker.activeSubscriptions().begin()->path() == "/newAddress"); - REQUIRE(worker.activeSubscriptions().begin()->params() == SubscriptionTopic::map{ { "ctx", "FAIR.SELECTOR.C=1" } }); + REQUIRE(worker.activeSubscriptions().begin()->params() == SubscriptionTopic::Params{ { "ctx", "FAIR.SELECTOR.C=1" } }); } { const auto notify = subClient.tryReadOne(); REQUIRE(notify.has_value()); REQUIRE(notify->command == mdp::Command::Final); - REQUIRE(notify->sourceId == "/newAddress?ctx=FAIR.SELECTOR.C%3D1"); - REQUIRE(notify->endpoint.str() == "/newAddress"); + REQUIRE(notify->sourceId == "/addressbook/newAddress?ctx=FAIR.SELECTOR.C%3D1"); + REQUIRE(notify->endpoint.str() == "/newAddress?contentType=application%2Fjson&ctx=FAIR.SELECTOR.C%3D1"); REQUIRE(notify->error.empty()); REQUIRE(notify->data.asString() == "{\n\"name\": \"Easter Bunny\",\n\"street\": \"Carrot Road\",\n\"streetNumber\": 123,\n\"postalCode\": \"88888\",\n\"city\": \"Easter Island\",\n\"isCurrent\": true\n}"); } diff --git a/src/majordomo/test/subscriptionmatcher_tests.cpp b/src/majordomo/test/subscriptionmatcher_tests.cpp index a6c8314c..3a16c5bc 100644 --- a/src/majordomo/test/subscriptionmatcher_tests.cpp +++ b/src/majordomo/test/subscriptionmatcher_tests.cpp @@ -14,7 +14,7 @@ using opencmw::mdp::SubscriptionTopic; struct SubscriptionUriMatcher : SubscriptionMatcher { bool operator()(const auto ¬ified, const auto &subscriber) const { - return SubscriptionMatcher::operator()(SubscriptionTopic::fromURI(notified), SubscriptionTopic::fromURI(subscriber)); + return SubscriptionMatcher::operator()(SubscriptionTopic::fromEndpoint(notified), SubscriptionTopic::fromEndpoint(subscriber)); } };