diff --git a/src/majordomo/include/majordomo/RestBackend.hpp b/src/majordomo/include/majordomo/RestBackend.hpp index 3d627e30..0734803e 100644 --- a/src/majordomo/include/majordomo/RestBackend.hpp +++ b/src/majordomo/include/majordomo/RestBackend.hpp @@ -62,9 +62,10 @@ using namespace std::chrono_literals; constexpr auto HTTP_OK = 200; constexpr auto HTTP_ERROR = 500; +constexpr auto HTTP_GATEWAY_TIMEOUT = 504; constexpr auto DEFAULT_REST_PORT = 8080; -constexpr auto REST_POLLING_TIME = 10s; constexpr auto UPDATER_POLLING_TIME = 1s; +constexpr auto LONG_POLL_SERVER_TIMEOUT = 30s; constexpr auto UNUSED_SUBSCRIPTION_EXPIRATION_TIME = 30s; constexpr std::size_t MAX_CACHED_REPLIES = 32; @@ -155,8 +156,8 @@ inline std::string_view acceptedMimeForRequest(const auto &request) { return acceptableMimeTypes[0]; } -bool respondWithError(auto &response, std::string_view message) { - response.status = HTTP_ERROR; +bool respondWithError(auto &response, std::string_view message, int status = HTTP_ERROR) { + response.status = status; response.set_content(message.data(), MIME::TEXT.typeName().data()); return true; }; @@ -281,9 +282,18 @@ struct Connection { return ReadLock(_cachedRepliesMutex); } - void waitForUpdate() { - auto temporaryLock = writeLock(); - _pollingIndexCV.wait(temporaryLock); + bool waitForUpdate(std::chrono::milliseconds timeout) { + // This could also periodically check for the client connection being dropped (e.g. due to client-side timeout) + // if cpp-httplib had API for that. + auto temporaryLock = writeLock(); + const auto next = _nextPollingIndex; + while (_nextPollingIndex == next) { + if (_pollingIndexCV.wait_for(temporaryLock, timeout) == std::cv_status::timeout) { + return false; + } + } + + return true; } std::size_t cachedRepliesSize(ReadLock & /*lock*/) const { @@ -315,9 +325,10 @@ struct Connection { template class RestBackend : public Mode { protected: - Broker &_broker; - const VirtualFS &_vfs; - URI<> _restAddress; + Broker &_broker; + const VirtualFS &_vfs; + URI<> _restAddress; + std::atomic _majordomoTimeout = 30000ms; private: std::jthread _connectionUpdaterThread; @@ -325,6 +336,19 @@ class RestBackend : public Mode { std::map> _connectionForService; public: + /** + * Timeout used for interaction with majordomo workers, i.e. the time to wait + * for notifications on subscriptions (long-polling) and for responses to Get/Set + * requests. + */ + void setMajordomoTimeout(std::chrono::milliseconds timeout) { + _majordomoTimeout = timeout; + } + + std::chrono::milliseconds majordomoTimeout() const { + return _majordomoTimeout; + } + using BrokerType = Broker; // returns a connection with refcount 1. Make sure you lower it to // zero at some point @@ -705,9 +729,9 @@ struct RestBackend::RestWorker { } pollItem.socket = connection.notificationSubscriptionSocket.zmq_ptr; - auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast(REST_POLLING_TIME).count()); + auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast(restBackend.majordomoTimeout()).count()); if (!pollResult || pollResult.value() == 0) { - detail::respondWithError(response, "Error: No response from broker\n"); + detail::respondWithError(response, "Error: No response from broker\n", HTTP_GATEWAY_TIMEOUT); } else if (auto responseMessage = zmq::receive(connection.notificationSubscriptionSocket); !responseMessage) { detail::respondWithError(response, "Error: Empty response from broker\n"); } else if (!responseMessage->error.empty()) { @@ -733,14 +757,16 @@ struct RestBackend::RestWorker { const auto subscriptionKey = subscription.toZmqTopic(); auto *connection = restBackend.notificationSubscriptionConnectionFor(subscriptionKey); assert(connection); - + const auto majordomoTimeout = restBackend.majordomoTimeout(); response.set_header("Access-Control-Allow-Origin", "*"); response.set_chunked_content_provider( "application/json", - [connection](std::size_t /*offset*/, httplib::DataSink &sink) mutable { + [connection, majordomoTimeout](std::size_t /*offset*/, httplib::DataSink &sink) mutable { std::cerr << "Chunked reply...\n"; - connection->waitForUpdate(); + if (!connection->waitForUpdate(majordomoTimeout)) { + return false; + } auto connectionCacheLock = connection->readLock(); auto lastIndex = connection->nextPollingIndex(connectionCacheLock) - 1; @@ -864,7 +890,9 @@ struct RestBackend::RestWorker { // Since we use KeepAlive object, the inital refCount can go away connection->decreaseReferenceCount(); - connection->waitForUpdate(); + if (!connection->waitForUpdate(restBackend.majordomoTimeout())) { + return detail::respondWithError(response, "Timeout waiting for update", HTTP_GATEWAY_TIMEOUT); + } const auto newCache = fetchCache(); diff --git a/src/majordomo/test/majordomoworker_rest_tests.cpp b/src/majordomo/test/majordomoworker_rest_tests.cpp index 7e44b6bd..c6220e7d 100644 --- a/src/majordomo/test/majordomoworker_rest_tests.cpp +++ b/src/majordomo/test/majordomoworker_rest_tests.cpp @@ -101,6 +101,29 @@ class PathWorker : public majordomo::Worker +class WaitingWorker : public majordomo::Worker { +public: + using super_t = majordomo::Worker; + + template + explicit WaitingWorker(const BrokerType &broker) + : super_t(broker, {}) { + super_t::setCallback([](majordomo::RequestContext &, const WaitingContext &inCtx, const SingleString &in, WaitingContext &outCtx, SingleString &out) { + fmt::println("Sleep for {}", inCtx.timeoutMs); + std::this_thread::sleep_for(std::chrono::milliseconds(inCtx.timeoutMs)); + outCtx = inCtx; + out.value = fmt::format("You said: {}", in.value); + }); + } +}; + TEST_CASE("Simple MajordomoWorker example showing its usage", "[majordomo][majordomoworker][simple_example]") { // We run both broker and worker inproc majordomo::Broker broker("/TestBroker", testSettings()); @@ -245,3 +268,51 @@ TEST_CASE("Subscriptions", "[majordomo][majordomoworker][subscription]") { std::ranges::sort(subscriptions); REQUIRE(subscriptions == std::vector{ "/colors", "/colors?blue&green&red", "/colors?green&red", "/colors?red" }); } + +TEST_CASE("Majordomo timeouts", "[majordomo][majordomoworker][rest]") { + majordomo::Broker broker("/TestBroker", testSettings()); + auto fs = cmrc::assets::get_filesystem(); + FileServerRestBackend rest(broker, fs); + RunInThread restServerRun(rest); + + opencmw::query::registerTypes(WaitingContext(), broker); + + WaitingWorker<"/waiter"> worker(broker); + + RunInThread brokerRun(broker); + RunInThread workerRun(worker); + + REQUIRE(waitUntilWorkerServiceAvailable(broker.context, worker)); + + // set timeout to unit-test friendly interval + rest.setMajordomoTimeout(800ms); + + SECTION("Waiting for notification that doesn't happen in time returns 504 message") { + std::vector clientThreads; + for (int i = 0; i < 16; ++i) { + clientThreads.push_back(makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", { "Timeout" }, { 504 })); + } + } + + SECTION("Waiting for notification that happens in time gives expected response") { + auto client = makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", { "This is a notification" }); + std::this_thread::sleep_for(400ms); + worker.notify({}, { "This is a notification" }); + } + + SECTION("Response to request takes too long, timeout status is returned") { + httplib::Client postData{ "http://localhost:8080" }; + auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=1200", "{\"value\": \"Hello!\"}", "application/json"); + REQUIRE(reply); + REQUIRE(reply->status == 504); + REQUIRE(reply->body.find("No response") != std::string::npos); + } + + SECTION("Response to request arrives in time") { + httplib::Client postData{ "http://localhost:8080" }; + auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=0", "{\"value\": \"Hello!\"}", "application/json"); + REQUIRE(reply); + REQUIRE(reply->status == 200); + REQUIRE(reply->body.find("You said: Hello!") != std::string::npos); + } +}