Skip to content

Commit

Permalink
Do not block worker threads forever if no event is received (#329)
Browse files Browse the repository at this point in the history
Make sure that long-poll request handlers do not block forever when no
corresponding event is received. Otherwise the clients will send one
request after another once their request times out client-side, until
the worker threads are exhausted and the server stops responding.

It would be great if we could also detect the client connection be
dropped using Keep-Alive, but cpp-httplib doesn't seem to have API for
that.
  • Loading branch information
frankosterfeld committed Dec 15, 2023
1 parent 5f9a53e commit 391b2df
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 15 deletions.
58 changes: 43 additions & 15 deletions src/majordomo/include/majordomo/RestBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ using namespace std::chrono_literals;

constexpr auto HTTP_OK = 200;
constexpr auto HTTP_ERROR = 500;
constexpr auto HTTP_GATEWAY_TIMEOUT = 504;
constexpr auto DEFAULT_REST_PORT = 8080;
constexpr auto REST_POLLING_TIME = 10s;
constexpr auto UPDATER_POLLING_TIME = 1s;
constexpr auto LONG_POLL_SERVER_TIMEOUT = 30s;
constexpr auto UNUSED_SUBSCRIPTION_EXPIRATION_TIME = 30s;
constexpr std::size_t MAX_CACHED_REPLIES = 32;

Expand Down Expand Up @@ -155,8 +156,8 @@ inline std::string_view acceptedMimeForRequest(const auto &request) {
return acceptableMimeTypes[0];
}

bool respondWithError(auto &response, std::string_view message) {
response.status = HTTP_ERROR;
bool respondWithError(auto &response, std::string_view message, int status = HTTP_ERROR) {
response.status = status;
response.set_content(message.data(), MIME::TEXT.typeName().data());
return true;
};
Expand Down Expand Up @@ -281,9 +282,18 @@ struct Connection {
return ReadLock(_cachedRepliesMutex);
}

void waitForUpdate() {
auto temporaryLock = writeLock();
_pollingIndexCV.wait(temporaryLock);
bool waitForUpdate(std::chrono::milliseconds timeout) {
// This could also periodically check for the client connection being dropped (e.g. due to client-side timeout)
// if cpp-httplib had API for that.
auto temporaryLock = writeLock();
const auto next = _nextPollingIndex;
while (_nextPollingIndex == next) {
if (_pollingIndexCV.wait_for(temporaryLock, timeout) == std::cv_status::timeout) {
return false;
}
}

return true;
}

std::size_t cachedRepliesSize(ReadLock & /*lock*/) const {
Expand Down Expand Up @@ -315,16 +325,30 @@ struct Connection {
template<typename Mode, typename VirtualFS, role... Roles>
class RestBackend : public Mode {
protected:
Broker<Roles...> &_broker;
const VirtualFS &_vfs;
URI<> _restAddress;
Broker<Roles...> &_broker;
const VirtualFS &_vfs;
URI<> _restAddress;
std::atomic<std::chrono::milliseconds> _majordomoTimeout = 30000ms;

private:
std::jthread _connectionUpdaterThread;
std::shared_mutex _connectionsMutex;
std::map<std::string, std::unique_ptr<detail::Connection>> _connectionForService;

public:
/**
* Timeout used for interaction with majordomo workers, i.e. the time to wait
* for notifications on subscriptions (long-polling) and for responses to Get/Set
* requests.
*/
void setMajordomoTimeout(std::chrono::milliseconds timeout) {
_majordomoTimeout = timeout;
}

std::chrono::milliseconds majordomoTimeout() const {
return _majordomoTimeout;
}

using BrokerType = Broker<Roles...>;
// returns a connection with refcount 1. Make sure you lower it to
// zero at some point
Expand Down Expand Up @@ -705,9 +729,9 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {
}

pollItem.socket = connection.notificationSubscriptionSocket.zmq_ptr;
auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast<std::chrono::milliseconds>(REST_POLLING_TIME).count());
auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast<std::chrono::milliseconds>(restBackend.majordomoTimeout()).count());
if (!pollResult || pollResult.value() == 0) {
detail::respondWithError(response, "Error: No response from broker\n");
detail::respondWithError(response, "Error: No response from broker\n", HTTP_GATEWAY_TIMEOUT);
} else if (auto responseMessage = zmq::receive<mdp::MessageFormat::WithoutSourceId>(connection.notificationSubscriptionSocket); !responseMessage) {
detail::respondWithError(response, "Error: Empty response from broker\n");
} else if (!responseMessage->error.empty()) {
Expand All @@ -733,14 +757,16 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {
const auto subscriptionKey = subscription.toZmqTopic();
auto *connection = restBackend.notificationSubscriptionConnectionFor(subscriptionKey);
assert(connection);

const auto majordomoTimeout = restBackend.majordomoTimeout();

Check warning on line 760 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L760

Added line #L760 was not covered by tests
response.set_header("Access-Control-Allow-Origin", "*");
response.set_chunked_content_provider(
"application/json",
[connection](std::size_t /*offset*/, httplib::DataSink &sink) mutable {
[connection, majordomoTimeout](std::size_t /*offset*/, httplib::DataSink &sink) mutable {

Check warning on line 764 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L764

Added line #L764 was not covered by tests
std::cerr << "Chunked reply...\n";

connection->waitForUpdate();
if (!connection->waitForUpdate(majordomoTimeout)) {
return false;

Check warning on line 768 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L768

Added line #L768 was not covered by tests
}

auto connectionCacheLock = connection->readLock();
auto lastIndex = connection->nextPollingIndex(connectionCacheLock) - 1;
Expand Down Expand Up @@ -864,7 +890,9 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {
// Since we use KeepAlive object, the inital refCount can go away
connection->decreaseReferenceCount();

connection->waitForUpdate();
if (!connection->waitForUpdate(restBackend.majordomoTimeout())) {
return detail::respondWithError(response, "Timeout waiting for update", HTTP_GATEWAY_TIMEOUT);
}

const auto newCache = fetchCache();

Expand Down
71 changes: 71 additions & 0 deletions src/majordomo/test/majordomoworker_rest_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ class PathWorker : public majordomo::Worker<serviceName, PathContext, majordomo:
}
};

struct WaitingContext {
int32_t timeoutMs = 0;
opencmw::MIME::MimeType contentType = opencmw::MIME::JSON;
};
ENABLE_REFLECTION_FOR(WaitingContext, timeoutMs, contentType)

template<units::basic_fixed_string serviceName, typename... Meta>
class WaitingWorker : public majordomo::Worker<serviceName, WaitingContext, SingleString, SingleString, Meta...> {
public:
using super_t = majordomo::Worker<serviceName, WaitingContext, SingleString, SingleString, Meta...>;

template<typename BrokerType>
explicit WaitingWorker(const BrokerType &broker)
: super_t(broker, {}) {
super_t::setCallback([](majordomo::RequestContext &, const WaitingContext &inCtx, const SingleString &in, WaitingContext &outCtx, SingleString &out) {
fmt::println("Sleep for {}", inCtx.timeoutMs);
std::this_thread::sleep_for(std::chrono::milliseconds(inCtx.timeoutMs));
outCtx = inCtx;
out.value = fmt::format("You said: {}", in.value);
});
}
};

TEST_CASE("Simple MajordomoWorker example showing its usage", "[majordomo][majordomoworker][simple_example]") {
// We run both broker and worker inproc
majordomo::Broker broker("/TestBroker", testSettings());
Expand Down Expand Up @@ -245,3 +268,51 @@ TEST_CASE("Subscriptions", "[majordomo][majordomoworker][subscription]") {
std::ranges::sort(subscriptions);
REQUIRE(subscriptions == std::vector<std::string>{ "/colors", "/colors?blue&green&red", "/colors?green&red", "/colors?red" });
}

TEST_CASE("Majordomo timeouts", "[majordomo][majordomoworker][rest]") {
majordomo::Broker broker("/TestBroker", testSettings());
auto fs = cmrc::assets::get_filesystem();
FileServerRestBackend<majordomo::PLAIN_HTTP, decltype(fs)> rest(broker, fs);
RunInThread restServerRun(rest);

opencmw::query::registerTypes(WaitingContext(), broker);

WaitingWorker<"/waiter"> worker(broker);

RunInThread brokerRun(broker);
RunInThread workerRun(worker);

REQUIRE(waitUntilWorkerServiceAvailable(broker.context, worker));

// set timeout to unit-test friendly interval
rest.setMajordomoTimeout(800ms);

SECTION("Waiting for notification that doesn't happen in time returns 504 message") {
std::vector<std::jthread> clientThreads;
for (int i = 0; i < 16; ++i) {
clientThreads.push_back(makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", { "Timeout" }, { 504 }));
}
}

SECTION("Waiting for notification that happens in time gives expected response") {
auto client = makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", { "This is a notification" });
std::this_thread::sleep_for(400ms);
worker.notify({}, { "This is a notification" });
}

SECTION("Response to request takes too long, timeout status is returned") {
httplib::Client postData{ "http://localhost:8080" };
auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=1200", "{\"value\": \"Hello!\"}", "application/json");
REQUIRE(reply);
REQUIRE(reply->status == 504);
REQUIRE(reply->body.find("No response") != std::string::npos);
}

SECTION("Response to request arrives in time") {
httplib::Client postData{ "http://localhost:8080" };
auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=0", "{\"value\": \"Hello!\"}", "application/json");
REQUIRE(reply);
REQUIRE(reply->status == 200);
REQUIRE(reply->body.find("You said: Hello!") != std::string::npos);
}
}

0 comments on commit 391b2df

Please sign in to comment.