diff --git a/src/engine/Server.cpp b/src/engine/Server.cpp index 87ca3897fc..f9ffdbf7f1 100644 --- a/src/engine/Server.cpp +++ b/src/engine/Server.cpp @@ -11,6 +11,7 @@ #include #include +#include "engine/ExecuteUpdate.h" #include "engine/ExportQueryExecutionTrees.h" #include "engine/QueryPlanner.h" #include "global/RuntimeParameters.h" @@ -43,7 +44,7 @@ Server::Server(unsigned short port, size_t numThreads, enablePatternTrick_(usePatternTrick), // The number of server threads currently also is the number of queries // that can be processed simultaneously. - threadPool_{numThreads} { + queryThreadPool_{numThreads} { // This also directly triggers the update functions and propagates the // values of the parameters to the cache. RuntimeParameters().setOnUpdateAction<"cache-max-num-entries">( @@ -394,6 +395,25 @@ Awaitable Server::process( logCommand(cmd, "clear cache completely (including unpinned elements)"); cache_.clearAll(); response = createJsonResponse(composeCacheStatsJson(), request); + } else if (auto cmd = checkParameter("cmd", "clear-delta-triples")) { + requireValidAccessToken("clear-delta-triples"); + logCommand(cmd, "clear delta triples"); + // The function requires a SharedCancellationHandle, but the operation is + // not cancellable. + auto handle = std::make_shared>(); + // We don't directly `co_await` because of lifetime issues (bugs) in the + // Conan setup. + auto coroutine = computeInNewThread( + updateThreadPool_, + [this] { + // Use `this` explicitly to silence false-positive errors on the + // captured `this` being unused. + this->index_.deltaTriplesManager().clear(); + }, + handle); + co_await std::move(coroutine); + response = createOkResponse("Delta triples have been cleared", request, + MediaType::textPlain); } else if (auto cmd = checkParameter("cmd", "get-settings")) { logCommand(cmd, "get server settings"); response = createJsonResponse(RuntimeParameters().toMap(), request); @@ -470,9 +490,10 @@ Awaitable Server::process( }; auto visitUpdate = [&checkParameter, &accessTokenOk, &request, &send, ¶meters, - &requestTimer, - this](const ad_utility::url_parser::sparqlOperation::Update& update) + &requestTimer, this, &requireValidAccessToken]( + const ad_utility::url_parser::sparqlOperation::Update& update) -> Awaitable { + requireValidAccessToken("SPARQL Update"); if (auto timeLimit = co_await verifyUserSubmittedQueryTimeout( checkParameter("timeout", std::nullopt), accessTokenOk, request, send)) { @@ -521,17 +542,17 @@ std::pair Server::determineResultPinning( checkParameter(params, "pinresult", "true").has_value(); return {pinSubtrees, pinResult}; } + // ____________________________________________________________________________ -Awaitable Server::setupPlannedQuery( +Server::PlannedQuery Server::setupPlannedQuery( const ad_utility::url_parser::ParamValueMap& params, const std::string& operation, QueryExecutionContext& qec, SharedCancellationHandle handle, TimeLimit timeLimit, - const ad_utility::Timer& requestTimer) { + const ad_utility::Timer& requestTimer) const { auto queryDatasets = ad_utility::url_parser::parseDatasetClauses(params); - std::optional plannedQuery = - co_await parseAndPlan(operation, queryDatasets, qec, handle, timeLimit); - AD_CORRECTNESS_CHECK(plannedQuery.has_value()); - auto& qet = plannedQuery.value().queryExecutionTree_; + PlannedQuery plannedQuery = + parseAndPlan(operation, queryDatasets, qec, handle, timeLimit); + auto& qet = plannedQuery.queryExecutionTree_; qet.isRoot() = true; // allow pinning of the final result auto timeForQueryPlanning = requestTimer.msecs(); auto& runtimeInfoWholeQuery = @@ -541,7 +562,7 @@ Awaitable Server::setupPlannedQuery( << " ms" << std::endl; LOG(TRACE) << qet.getCacheKey() << std::endl; - co_return std::move(plannedQuery.value()); + return plannedQuery; } // _____________________________________________________________________________ json Server::composeErrorResponseJson( @@ -751,6 +772,18 @@ MediaType Server::determineMediaType( return mediaType.value(); } +// ____________________________________________________________________________ +ad_utility::websocket::MessageSender Server::createMessageSender( + const std::weak_ptr& queryHub, + const ad_utility::httpUtils::HttpRequest auto& request, + const string& operation) { + auto queryHubLock = queryHub.lock(); + AD_CORRECTNESS_CHECK(queryHubLock); + ad_utility::websocket::MessageSender messageSender{ + getQueryId(request, operation), *queryHubLock}; + return messageSender; +} + // ____________________________________________________________________________ Awaitable Server::processQuery( const ad_utility::url_parser::ParamValueMap& params, const string& query, @@ -761,11 +794,8 @@ Awaitable Server::processQuery( LOG(INFO) << "Requested media type of result is \"" << ad_utility::toString(mediaType) << "\"" << std::endl; - auto queryHub = queryHub_.lock(); - AD_CORRECTNESS_CHECK(queryHub); - ad_utility::websocket::MessageSender messageSender{getQueryId(request, query), - *queryHub}; - + ad_utility::websocket::MessageSender messageSender = + createMessageSender(queryHub_, request, query); auto [cancellationHandle, cancelTimeoutOnDestruction] = setupCancellationHandle(messageSender.getQueryId(), timeLimit); @@ -779,15 +809,35 @@ Awaitable Server::processQuery( QueryExecutionContext qec(index_, &cache_, allocator_, sortPerformanceEstimator_, std::ref(messageSender), pinSubtrees, pinResult); - auto plannedQuery = co_await setupPlannedQuery( - params, query, qec, cancellationHandle, timeLimit, requestTimer); + + // The usage of an `optional` here is required because of a limitation in + // Boost::Asio which forces us to use default-constructible result types with + // `computeInNewThread`. We also can't unwrap the optional directly in this + // function, because then the conan build fails in a very strange way, + // probably related to issues in GCC's coroutine implementation. + // For the same reason (crashes in the conanbuild) we store the coroutine in + // an explicit variable instead of directly `co_await`-ing it. + auto coroutine = computeInNewThread( + queryThreadPool_, + [this, ¶ms, &query, &qec, cancellationHandle, &timeLimit, + &requestTimer]() -> std::optional { + return setupPlannedQuery(params, query, qec, cancellationHandle, + timeLimit, requestTimer); + }, + cancellationHandle); + auto plannedQueryOpt = co_await std::move(coroutine); + AD_CORRECTNESS_CHECK(plannedQueryOpt.has_value()); + auto plannedQuery = std::move(plannedQueryOpt).value(); auto qet = plannedQuery.queryExecutionTree_; if (plannedQuery.parsedQuery_.hasUpdateClause()) { // This may be caused by a bug (the code is not yet tested well) or by an // attack which tries to circumvent (not yet existing) access controls for // Update. - throw std::runtime_error("Expected normal query but received update query"); + throw std::runtime_error( + absl::StrCat("SPARQL QUERY was request via the HTTP request, but the " + "following update was sent instead of an update: ", + plannedQuery.parsedQuery_._originalString)); } // Read the export limit from the send` parameter (historical name). This @@ -833,6 +883,80 @@ Awaitable Server::processQuery( co_return; } +// ____________________________________________________________________________ +void Server::processUpdateImpl( + const ad_utility::url_parser::ParamValueMap& params, const string& update, + ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender, + ad_utility::SharedCancellationHandle cancellationHandle, + DeltaTriples& deltaTriples) { + auto [pinSubtrees, pinResult] = determineResultPinning(params); + LOG(INFO) << "Processing the following SPARQL update:" + << (pinResult ? " [pin result]" : "") + << (pinSubtrees ? " [pin subresults]" : "") << "\n" + << update << std::endl; + QueryExecutionContext qec(index_, &cache_, allocator_, + sortPerformanceEstimator_, std::ref(messageSender), + pinSubtrees, pinResult); + auto plannedQuery = setupPlannedQuery(params, update, qec, cancellationHandle, + timeLimit, requestTimer); + auto qet = plannedQuery.queryExecutionTree_; + + if (!plannedQuery.parsedQuery_.hasUpdateClause()) { + throw std::runtime_error( + absl::StrCat("SPARQL UPDATE was request via the HTTP request, but the " + "following query was sent instead of an update: ", + plannedQuery.parsedQuery_._originalString)); + } + ExecuteUpdate::executeUpdate(index_, plannedQuery.parsedQuery_, qet, + deltaTriples, cancellationHandle); + + LOG(INFO) << "Done processing update" + << ", total time was " << requestTimer.msecs().count() << " ms" + << std::endl; + LOG(DEBUG) << "Runtime Info:\n" + << qet.getRootOperation()->runtimeInfo().toString() << std::endl; +} + +// ____________________________________________________________________________ +Awaitable Server::processUpdate( + const ad_utility::url_parser::ParamValueMap& params, const string& update, + ad_utility::Timer& requestTimer, + const ad_utility::httpUtils::HttpRequest auto& request, auto&& send, + TimeLimit timeLimit) { + auto messageSender = createMessageSender(queryHub_, request, update); + + auto [cancellationHandle, cancelTimeoutOnDestruction] = + setupCancellationHandle(messageSender.getQueryId(), timeLimit); + + // Update the delta triples. + + // Note: We don't directly `co_await` because of lifetime issues (probably + // bugs in GCC or Boost) that occur in the Conan build. + auto coroutine = computeInNewThread( + updateThreadPool_, + [this, ¶ms, &update, &requestTimer, &timeLimit, &messageSender, + &cancellationHandle] { + index_.deltaTriplesManager().modify( + [this, ¶ms, &update, &requestTimer, &timeLimit, &messageSender, + &cancellationHandle](auto& deltaTriples) { + // Use `this` explicitly to silence false-positive errors on + // captured `this` being unused. + this->processUpdateImpl(params, update, requestTimer, timeLimit, + messageSender, cancellationHandle, + deltaTriples); + }); + }, + cancellationHandle); + co_await std::move(coroutine); + + // TODO send a proper response + // SPARQL 1.1 Protocol 2.2.4 Successful Responses: "The response body of a + // successful update request is implementation defined." + co_await send(ad_utility::httpUtils::createOkResponse( + "Update successful", request, MediaType::textPlain)); + co_return; +} + // ____________________________________________________________________________ template Awaitable Server::processQueryOrUpdate( @@ -858,8 +982,8 @@ Awaitable Server::processQueryOrUpdate( co_await processQuery(params, queryOrUpdate, requestTimer, request, send, timeLimit); } else { - throw std::runtime_error( - "SPARQL 1.1 Update is currently not supported by QLever."); + co_await processUpdate(params, queryOrUpdate, requestTimer, request, send, + timeLimit); } } catch (const ParseException& e) { responseStatus = http::status::bad_request; @@ -894,7 +1018,7 @@ Awaitable Server::processQueryOrUpdate( exceptionErrorMsg.value().append(absl::StrCat( " Highlighting an error for the command line log failed: ", e.what())); - LOG(ERROR) << "Failed to highlight error in query. " << e.what() + LOG(ERROR) << "Failed to highlight error in operation. " << e.what() << std::endl; LOG(ERROR) << metadata.value().query_ << std::endl; } @@ -915,7 +1039,8 @@ Awaitable Server::processQueryOrUpdate( // _____________________________________________________________________________ template -Awaitable Server::computeInNewThread(Function function, +Awaitable Server::computeInNewThread(net::static_thread_pool& threadPool, + Function function, SharedCancellationHandle handle) { // `interruptible` will set the shared state of this promise // with a function that can be used to cancel the timer. @@ -935,48 +1060,35 @@ Awaitable Server::computeInNewThread(Function function, // this might still block. However it will make the code check the // cancellation handle while waiting for a thread in the pool to become ready. return ad_utility::interruptible( - ad_utility::runFunctionOnExecutor(threadPool_.get_executor(), + ad_utility::runFunctionOnExecutor(threadPool.get_executor(), std::move(inner), net::use_awaitable), std::move(handle), std::move(cancelTimerPromise)); } // _____________________________________________________________________________ -net::awaitable> Server::parseAndPlan( +Server::PlannedQuery Server::parseAndPlan( const std::string& query, const vector& queryDatasets, QueryExecutionContext& qec, SharedCancellationHandle handle, - TimeLimit timeLimit) { - auto handleCopy = handle; - - // The usage of an `optional` here is required because of a limitation in - // Boost::Asio which forces us to use default-constructible result types with - // `computeInNewThread`. We also can't unwrap the optional directly in this - // function, because then the conan build fails in a very strange way, - // probably related to issues in GCC's coroutine implementation. - return computeInNewThread( - [&query, &qec, enablePatternTrick = enablePatternTrick_, - handle = std::move(handle), timeLimit, - &queryDatasets]() mutable -> std::optional { - auto pq = SparqlParser::parseQuery(query); - handle->throwIfCancelled(); - // SPARQL Protocol 2.1.4 specifies that the dataset from the query - // parameters overrides the dataset from the query itself. - if (!queryDatasets.empty()) { - pq.datasetClauses_ = - parsedQuery::DatasetClauses::fromClauses(queryDatasets); - } - QueryPlanner qp(&qec, handle); - qp.setEnablePatternTrick(enablePatternTrick); - auto qet = qp.createExecutionTree(pq); - handle->throwIfCancelled(); - PlannedQuery plannedQuery{std::move(pq), std::move(qet)}; - - plannedQuery.queryExecutionTree_.getRootOperation() - ->recursivelySetCancellationHandle(std::move(handle)); - plannedQuery.queryExecutionTree_.getRootOperation() - ->recursivelySetTimeConstraint(timeLimit); - return plannedQuery; - }, - std::move(handleCopy)); + TimeLimit timeLimit) const { + auto pq = SparqlParser::parseQuery(query); + handle->throwIfCancelled(); + // SPARQL Protocol 2.1.4 specifies that the dataset from the query + // parameters overrides the dataset from the query itself. + if (!queryDatasets.empty()) { + pq.datasetClauses_ = + parsedQuery::DatasetClauses::fromClauses(queryDatasets); + } + QueryPlanner qp(&qec, handle); + qp.setEnablePatternTrick(enablePatternTrick_); + auto qet = qp.createExecutionTree(pq); + handle->throwIfCancelled(); + PlannedQuery plannedQuery{std::move(pq), std::move(qet)}; + + plannedQuery.queryExecutionTree_.getRootOperation() + ->recursivelySetCancellationHandle(std::move(handle)); + plannedQuery.queryExecutionTree_.getRootOperation() + ->recursivelySetTimeConstraint(timeLimit); + return plannedQuery; } // _____________________________________________________________________________ diff --git a/src/engine/Server.h b/src/engine/Server.h index e7e514adf1..3ccc070cb7 100644 --- a/src/engine/Server.h +++ b/src/engine/Server.h @@ -32,6 +32,8 @@ using std::vector; //! The HTTP Server used. class Server { FRIEND_TEST(ServerTest, parseHttpRequest); + FRIEND_TEST(ServerTest, getQueryId); + FRIEND_TEST(ServerTest, createMessageSender); public: explicit Server(unsigned short port, size_t numThreads, @@ -76,7 +78,8 @@ class Server { /// the `WebSocketHandler` created for `HttpServer`. std::weak_ptr queryHub_; - net::static_thread_pool threadPool_; + net::static_thread_pool queryThreadPool_; + net::static_thread_pool updateThreadPool_{1}; /// Executor with a single thread that is used to run timers asynchronously. net::static_thread_pool timerExecutor_{1}; @@ -147,6 +150,12 @@ class Server { ad_utility::Timer& requestTimer, const ad_utility::httpUtils::HttpRequest auto& request, auto&& send, TimeLimit timeLimit); + // Do the actual execution of an update. + Awaitable processUpdate( + const ad_utility::url_parser::ParamValueMap& params, const string& update, + ad_utility::Timer& requestTimer, + const ad_utility::httpUtils::HttpRequest auto& request, auto&& send, + TimeLimit timeLimit); // Determine the media type to be used for the result. The media type is // determined (in this order) by the current action (e.g., @@ -160,11 +169,23 @@ class Server { const ad_utility::url_parser::ParamValueMap& params); FRIEND_TEST(ServerTest, determineResultPinning); // Sets up the PlannedQuery s.t. it is ready to be executed. - Awaitable setupPlannedQuery( + PlannedQuery setupPlannedQuery( const ad_utility::url_parser::ParamValueMap& params, const std::string& operation, QueryExecutionContext& qec, SharedCancellationHandle handle, TimeLimit timeLimit, - const ad_utility::Timer& requestTimer); + const ad_utility::Timer& requestTimer) const; + // Creates a `MessageSender` for the given operation. + ad_utility::websocket::MessageSender createMessageSender( + const std::weak_ptr& queryHub, + const ad_utility::httpUtils::HttpRequest auto& request, + const string& operation); + // Execute an update operation. The function must have exclusive access to the + // DeltaTriples object. + void processUpdateImpl( + const ad_utility::url_parser::ParamValueMap& params, const string& update, + ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender, + ad_utility::SharedCancellationHandle cancellationHandle, + DeltaTriples& deltaTriples); static json composeErrorResponseJson( const string& query, const std::string& errorMsg, @@ -176,10 +197,11 @@ class Server { json composeCacheStatsJson() const; /// Invoke `function` on `threadPool_`, and return an awaitable to wait for - /// it's completion, wrapping the result. + /// its completion, wrapping the result. template > - Awaitable computeInNewThread(Function function, + Awaitable computeInNewThread(net::static_thread_pool& threadPool, + Function function, SharedCancellationHandle handle); /// This method extracts a client-defined query id from the passed HTTP @@ -210,13 +232,11 @@ class Server { /// Run the SPARQL parser and then the query planner on the `query`. All /// computation is performed on the `threadPool_`. - /// Note: This function *never* returns `nullopt`. It either returns a value - /// or throws an exception. We still need to return an `optional` though for - /// technical reasons that are described in the definition of this function. - net::awaitable> parseAndPlan( - const std::string& query, const vector& queryDatasets, - QueryExecutionContext& qec, SharedCancellationHandle handle, - TimeLimit timeLimit); + PlannedQuery parseAndPlan(const std::string& query, + const vector& queryDatasets, + QueryExecutionContext& qec, + SharedCancellationHandle handle, + TimeLimit timeLimit) const; /// Acquire the `CancellationHandle` for the given `QueryId`, start the /// watchdog and call `cancelAfterDeadline` to set the timeout after diff --git a/src/index/DeltaTriples.cpp b/src/index/DeltaTriples.cpp index 0d2ac3bac9..2c1e717c60 100644 --- a/src/index/DeltaTriples.cpp +++ b/src/index/DeltaTriples.cpp @@ -212,6 +212,9 @@ void DeltaTriplesManager::modify( }); } +// _____________________________________________________________________________ +void DeltaTriplesManager::clear() { modify(&DeltaTriples::clear); } + // _____________________________________________________________________________ SharedLocatedTriplesSnapshot DeltaTriplesManager::getCurrentSnapshot() const { return *currentLocatedTriplesSnapshot_.rlock(); diff --git a/src/index/DeltaTriples.h b/src/index/DeltaTriples.h index 329bd1333b..7d0cd7f4e9 100644 --- a/src/index/DeltaTriples.h +++ b/src/index/DeltaTriples.h @@ -196,11 +196,16 @@ class DeltaTriplesManager { FRIEND_TEST(DeltaTriplesTest, DeltaTriplesManager); // Modify the underlying `DeltaTriples` by applying `function` and then update - // the current snapshot. Concurrent calls to `modify` will be serialized, and - // each call to `getCurrentSnapshot` will either return the snapshot before or - // after a modification, but never one of an ongoing modification. + // the current snapshot. Concurrent calls to `modify` and `clear` will be + // serialized, and each call to `getCurrentSnapshot` will either return the + // snapshot before or after a modification, but never one of an ongoing + // modification. void modify(const std::function& function); + // Reset the updates represented by the underlying `DeltaTriples` and then + // update the current snapshot. + void clear(); + // Return a shared pointer to a deep copy of the current snapshot. This can // be safely used to execute a query without interfering with future updates. SharedLocatedTriplesSnapshot getCurrentSnapshot() const; diff --git a/src/util/CancellationHandle.h b/src/util/CancellationHandle.h index 0e62a01c61..28a445684c 100644 --- a/src/util/CancellationHandle.h +++ b/src/util/CancellationHandle.h @@ -98,8 +98,8 @@ class CancellationException : public std::exception { : message_{std::move(message)} {} explicit CancellationException(CancellationState reason) : message_{reason == CancellationState::TIMEOUT - ? "Query timed out." - : "Query was manually cancelled."} { + ? "Operation timed out." + : "Operation was manually cancelled."} { AD_CONTRACT_CHECK(detail::isCancelled(reason)); } diff --git a/test/ServerTest.cpp b/test/ServerTest.cpp index 5ed589fc42..292d77f12b 100644 --- a/test/ServerTest.cpp +++ b/test/ServerTest.cpp @@ -27,27 +27,27 @@ auto ParsedRequestIs = [](const std::string& path, AD_FIELD(ad_utility::url_parser::ParsedRequest, operation_, testing::Eq(operation))); }; +auto MakeBasicRequest = [](http::verb method, const std::string& target) { + // version 11 stands for HTTP/1.1 + return http::request{method, target, 11}; +}; +auto MakeGetRequest = [](const std::string& target) { + return MakeBasicRequest(http::verb::get, target); +}; +auto MakePostRequest = [](const std::string& target, + const std::string& contentType, + const std::string& body) { + auto req = MakeBasicRequest(http::verb::post, target); + req.set(http::field::content_type, contentType); + req.body() = body; + req.prepare_payload(); + return req; +}; } // namespace TEST(ServerTest, parseHttpRequest) { namespace http = boost::beast::http; - auto MakeBasicRequest = [](http::verb method, const std::string& target) { - // version 11 stands for HTTP/1.1 - return http::request{method, target, 11}; - }; - auto MakeGetRequest = [&MakeBasicRequest](const std::string& target) { - return MakeBasicRequest(http::verb::get, target); - }; - auto MakePostRequest = [&MakeBasicRequest](const std::string& target, - const std::string& contentType, - const std::string& body) { - auto req = MakeBasicRequest(http::verb::post, target); - req.set(http::field::content_type, contentType); - req.body() = body; - req.prepare_payload(); - return req; - }; auto parse = [](const ad_utility::httpUtils::HttpRequest auto& request) { return Server::parseHttpRequest(request); }; @@ -222,3 +222,65 @@ TEST(ServerTest, determineMediaType) { EXPECT_THAT(Server::determineMediaType({}, MakeRequest("")), testing::Eq(ad_utility::MediaType::sparqlJson)); } + +TEST(ServerTest, getQueryId) { + using namespace ad_utility::websocket; + Server server{9999, 1, ad_utility::MemorySize::megabytes(1), "accessToken"}; + auto reqWithExplicitQueryId = MakeGetRequest("/"); + reqWithExplicitQueryId.set("Query-Id", "100"); + const auto req = MakeGetRequest("/"); + { + // A request with a custom query id. + auto queryId1 = server.getQueryId(reqWithExplicitQueryId, + "SELECT * WHERE { ?a ?b ?c }"); + // Another request with the same custom query id. This throws an error, + // because query id cannot be used for multiple queries at the same time. + AD_EXPECT_THROW_WITH_MESSAGE( + server.getQueryId(reqWithExplicitQueryId, + "SELECT * WHERE { ?a ?b ?c }"), + testing::HasSubstr("Query id '100' is already in use!")); + } + // The custom query id can be reused, once the query is finished. + auto queryId1 = + server.getQueryId(reqWithExplicitQueryId, "SELECT * WHERE { ?a ?b ?c }"); + // Without custom query ids, unique ids are generated. + auto queryId2 = server.getQueryId(req, "SELECT * WHERE { ?a ?b ?c }"); + auto queryId3 = server.getQueryId(req, "SELECT * WHERE { ?a ?b ?c }"); +} + +TEST(ServerTest, createMessageSender) { + Server server{9999, 1, ad_utility::MemorySize::megabytes(1), "accessToken"}; + auto reqWithExplicitQueryId = MakeGetRequest("/"); + std::string customQueryId = "100"; + reqWithExplicitQueryId.set("Query-Id", customQueryId); + const auto req = MakeGetRequest("/"); + // The query hub is only valid once, the server has been started. + AD_EXPECT_THROW_WITH_MESSAGE( + server.createMessageSender(server.queryHub_, req, + "SELECT * WHERE { ?a ?b ?c }"), + testing::HasSubstr("Assertion `queryHubLock` failed.")); + { + // Set a dummy query hub. + boost::asio::io_context io_context; + auto queryHub = + std::make_shared(io_context); + server.queryHub_ = queryHub; + // MessageSenders are created normally. + server.createMessageSender(server.queryHub_, req, + "SELECT * WHERE { ?a ?b ?c }"); + server.createMessageSender(server.queryHub_, req, + "INSERT DATA { }"); + EXPECT_THAT( + server.createMessageSender(server.queryHub_, reqWithExplicitQueryId, + "INSERT DATA { }"), + AD_PROPERTY(ad_utility::websocket::MessageSender, getQueryId, + testing::Eq(ad_utility::websocket::QueryId::idFromString( + customQueryId)))); + } + // Once the query hub expires (e.g. because the io context dies), message + // senders can no longer be created. + AD_EXPECT_THROW_WITH_MESSAGE( + server.createMessageSender(server.queryHub_, req, + "SELECT * WHERE { ?a ?b ?c }"), + testing::HasSubstr("Assertion `queryHubLock` failed.")); +}