Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DeltaTriplesManager to process update request #1608

Merged
merged 19 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 169 additions & 57 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string>
#include <vector>

#include "engine/ExecuteUpdate.h"
#include "engine/ExportQueryExecutionTrees.h"
#include "engine/QueryPlanner.h"
#include "global/RuntimeParameters.h"
Expand Down Expand Up @@ -43,7 +44,7 @@
enablePatternTrick_(usePatternTrick),
// The number of server threads currently also is the number of queries
// that can be processed simultaneously.
threadPool_{numThreads} {
queryThreadPool_{numThreads} {
// This also directly triggers the update functions and propagates the
// values of the parameters to the cache.
RuntimeParameters().setOnUpdateAction<"cache-max-num-entries">(
Expand Down Expand Up @@ -394,6 +395,25 @@
logCommand(cmd, "clear cache completely (including unpinned elements)");
cache_.clearAll();
response = createJsonResponse(composeCacheStatsJson(), request);
} else if (auto cmd = checkParameter("cmd", "clear-delta-triples")) {
requireValidAccessToken("clear-delta-triples");
logCommand(cmd, "clear delta triples");

Check warning on line 400 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L399-L400

Added lines #L399 - L400 were not covered by tests
// The function requires a SharedCancellationHandle, but the operation is
// not cancellable.
auto handle = std::make_shared<ad_utility::CancellationHandle<>>();

Check warning on line 403 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L403

Added line #L403 was not covered by tests
// We don't directly `co_await` because of lifetime issues (bugs) in the
// Conan setup.
auto coroutine = computeInNewThread(
updateThreadPool_,
[this] {

Check warning on line 408 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L406-L408

Added lines #L406 - L408 were not covered by tests
// Use `this` explicitly to silence false-positive errors on the
// captured `this` being unused.
this->index_.deltaTriplesManager().clear();
},
handle);
co_await std::move(coroutine);
response = createOkResponse("Delta triples have been cleared", request,
MediaType::textPlain);

Check warning on line 416 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L411-L416

Added lines #L411 - L416 were not covered by tests
} else if (auto cmd = checkParameter("cmd", "get-settings")) {
logCommand(cmd, "get server settings");
response = createJsonResponse(RuntimeParameters().toMap(), request);
Expand Down Expand Up @@ -470,9 +490,10 @@
};
auto visitUpdate =
[&checkParameter, &accessTokenOk, &request, &send, &parameters,
&requestTimer,
this](const ad_utility::url_parser::sparqlOperation::Update& update)
&requestTimer, this, &requireValidAccessToken](
const ad_utility::url_parser::sparqlOperation::Update& update)

Check warning on line 494 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L493-L494

Added lines #L493 - L494 were not covered by tests
-> Awaitable<void> {
requireValidAccessToken("SPARQL Update");

Check warning on line 496 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L496

Added line #L496 was not covered by tests
if (auto timeLimit = co_await verifyUserSubmittedQueryTimeout(
checkParameter("timeout", std::nullopt), accessTokenOk, request,
send)) {
Expand Down Expand Up @@ -521,17 +542,17 @@
checkParameter(params, "pinresult", "true").has_value();
return {pinSubtrees, pinResult};
}

// ____________________________________________________________________________
Awaitable<Server::PlannedQuery> Server::setupPlannedQuery(
Server::PlannedQuery Server::setupPlannedQuery(
const ad_utility::url_parser::ParamValueMap& params,
const std::string& operation, QueryExecutionContext& qec,
SharedCancellationHandle handle, TimeLimit timeLimit,
const ad_utility::Timer& requestTimer) {
const ad_utility::Timer& requestTimer) const {

Check warning on line 551 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L551

Added line #L551 was not covered by tests
auto queryDatasets = ad_utility::url_parser::parseDatasetClauses(params);
std::optional<PlannedQuery> plannedQuery =
co_await parseAndPlan(operation, queryDatasets, qec, handle, timeLimit);
AD_CORRECTNESS_CHECK(plannedQuery.has_value());
auto& qet = plannedQuery.value().queryExecutionTree_;
PlannedQuery plannedQuery =
parseAndPlan(operation, queryDatasets, qec, handle, timeLimit);
auto& qet = plannedQuery.queryExecutionTree_;

Check warning on line 555 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L553-L555

Added lines #L553 - L555 were not covered by tests
qet.isRoot() = true; // allow pinning of the final result
auto timeForQueryPlanning = requestTimer.msecs();
auto& runtimeInfoWholeQuery =
Expand All @@ -541,7 +562,7 @@
<< " ms" << std::endl;
LOG(TRACE) << qet.getCacheKey() << std::endl;

co_return std::move(plannedQuery.value());
return plannedQuery;

Check warning on line 565 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L565

Added line #L565 was not covered by tests
}
// _____________________________________________________________________________
json Server::composeErrorResponseJson(
Expand Down Expand Up @@ -751,6 +772,18 @@
return mediaType.value();
}

// ____________________________________________________________________________
ad_utility::websocket::MessageSender Server::createMessageSender(
const std::weak_ptr<ad_utility::websocket::QueryHub>& queryHub,
const ad_utility::httpUtils::HttpRequest auto& request,
const string& operation) {
auto queryHubLock = queryHub.lock();
AD_CORRECTNESS_CHECK(queryHubLock);
ad_utility::websocket::MessageSender messageSender{
getQueryId(request, operation), *queryHubLock};
return messageSender;
}

// ____________________________________________________________________________
Awaitable<void> Server::processQuery(
const ad_utility::url_parser::ParamValueMap& params, const string& query,
Expand All @@ -761,11 +794,8 @@
LOG(INFO) << "Requested media type of result is \""
<< ad_utility::toString(mediaType) << "\"" << std::endl;

auto queryHub = queryHub_.lock();
AD_CORRECTNESS_CHECK(queryHub);
ad_utility::websocket::MessageSender messageSender{getQueryId(request, query),
*queryHub};

ad_utility::websocket::MessageSender messageSender =
createMessageSender(queryHub_, request, query);

Check warning on line 798 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L797-L798

Added lines #L797 - L798 were not covered by tests
auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

Expand All @@ -779,15 +809,35 @@
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
auto plannedQuery = co_await setupPlannedQuery(
params, query, qec, cancellationHandle, timeLimit, requestTimer);

// The usage of an `optional` here is required because of a limitation in
// Boost::Asio which forces us to use default-constructible result types with
// `computeInNewThread`. We also can't unwrap the optional directly in this
// function, because then the conan build fails in a very strange way,
// probably related to issues in GCC's coroutine implementation.
// For the same reason (crashes in the conanbuild) we store the coroutine in
// an explicit variable instead of directly `co_await`-ing it.
auto coroutine = computeInNewThread(
queryThreadPool_,
[this, &params, &query, &qec, cancellationHandle, &timeLimit,
&requestTimer]() -> std::optional<PlannedQuery> {
return setupPlannedQuery(params, query, qec, cancellationHandle,
timeLimit, requestTimer);
},
cancellationHandle);
auto plannedQueryOpt = co_await std::move(coroutine);
AD_CORRECTNESS_CHECK(plannedQueryOpt.has_value());
auto plannedQuery = std::move(plannedQueryOpt).value();

Check warning on line 830 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L820-L830

Added lines #L820 - L830 were not covered by tests
auto qet = plannedQuery.queryExecutionTree_;

if (plannedQuery.parsedQuery_.hasUpdateClause()) {
// This may be caused by a bug (the code is not yet tested well) or by an
// attack which tries to circumvent (not yet existing) access controls for
// Update.
throw std::runtime_error("Expected normal query but received update query");
throw std::runtime_error(
absl::StrCat("SPARQL QUERY was request via the HTTP request, but the "
"following update was sent instead of an update: ",
plannedQuery.parsedQuery_._originalString));

Check warning on line 840 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L837-L840

Added lines #L837 - L840 were not covered by tests
}

// Read the export limit from the send` parameter (historical name). This
Expand Down Expand Up @@ -833,6 +883,80 @@
co_return;
}

// ____________________________________________________________________________
void Server::processUpdateImpl(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender,
ad_utility::SharedCancellationHandle cancellationHandle,
DeltaTriples& deltaTriples) {
auto [pinSubtrees, pinResult] = determineResultPinning(params);
LOG(INFO) << "Processing the following SPARQL update:"

Check warning on line 893 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L891-L893

Added lines #L891 - L893 were not covered by tests
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< update << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
auto plannedQuery = setupPlannedQuery(params, update, qec, cancellationHandle,
timeLimit, requestTimer);
auto qet = plannedQuery.queryExecutionTree_;

Check warning on line 902 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L896-L902

Added lines #L896 - L902 were not covered by tests

if (!plannedQuery.parsedQuery_.hasUpdateClause()) {
throw std::runtime_error(
absl::StrCat("SPARQL UPDATE was request via the HTTP request, but the "
"following query was sent instead of an update: ",
plannedQuery.parsedQuery_._originalString));
}
ExecuteUpdate::executeUpdate(index_, plannedQuery.parsedQuery_, qet,
deltaTriples, cancellationHandle);

Check warning on line 911 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L905-L911

Added lines #L905 - L911 were not covered by tests

LOG(INFO) << "Done processing update"
<< ", total time was " << requestTimer.msecs().count() << " ms"
<< std::endl;
LOG(DEBUG) << "Runtime Info:\n"
<< qet.getRootOperation()->runtimeInfo().toString() << std::endl;
}

Check warning on line 918 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L913-L918

Added lines #L913 - L918 were not covered by tests

// ____________________________________________________________________________
Awaitable<void> Server::processUpdate(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
auto messageSender = createMessageSender(queryHub_, request, update);

Check warning on line 926 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L925-L926

Added lines #L925 - L926 were not covered by tests

auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

Check warning on line 929 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L928-L929

Added lines #L928 - L929 were not covered by tests

// Update the delta triples.

// Note: We don't directly `co_await` because of lifetime issues (probably
// bugs in GCC or Boost) that occur in the Conan build.
auto coroutine = computeInNewThread(
updateThreadPool_,
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle] {
index_.deltaTriplesManager().modify(
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle](auto& deltaTriples) {

Check warning on line 941 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L935-L941

Added lines #L935 - L941 were not covered by tests
Qup42 marked this conversation as resolved.
Show resolved Hide resolved
// Use `this` explicitly to silence false-positive errors on
// captured `this` being unused.
this->processUpdateImpl(params, update, requestTimer, timeLimit,
messageSender, cancellationHandle,
deltaTriples);
});
},
cancellationHandle);
co_await std::move(coroutine);

Check warning on line 950 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L944-L950

Added lines #L944 - L950 were not covered by tests

// TODO<qup42> send a proper response
// SPARQL 1.1 Protocol 2.2.4 Successful Responses: "The response body of a
// successful update request is implementation defined."
Qup42 marked this conversation as resolved.
Show resolved Hide resolved
co_await send(ad_utility::httpUtils::createOkResponse(
"Update successful", request, MediaType::textPlain));
co_return;
}

Check warning on line 958 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L955-L958

Added lines #L955 - L958 were not covered by tests

// ____________________________________________________________________________
template <Server::OperationType type>
Awaitable<void> Server::processQueryOrUpdate(
Expand All @@ -858,8 +982,8 @@
co_await processQuery(params, queryOrUpdate, requestTimer, request, send,
timeLimit);
} else {
throw std::runtime_error(
"SPARQL 1.1 Update is currently not supported by QLever.");
co_await processUpdate(params, queryOrUpdate, requestTimer, request, send,
timeLimit);

Check warning on line 986 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L985-L986

Added lines #L985 - L986 were not covered by tests
}
} catch (const ParseException& e) {
responseStatus = http::status::bad_request;
Expand Down Expand Up @@ -894,7 +1018,7 @@
exceptionErrorMsg.value().append(absl::StrCat(
" Highlighting an error for the command line log failed: ",
e.what()));
LOG(ERROR) << "Failed to highlight error in query. " << e.what()
LOG(ERROR) << "Failed to highlight error in operation. " << e.what()

Check warning on line 1021 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1021

Added line #L1021 was not covered by tests
<< std::endl;
LOG(ERROR) << metadata.value().query_ << std::endl;
}
Expand All @@ -915,7 +1039,8 @@

// _____________________________________________________________________________
template <std::invocable Function, typename T>
Awaitable<T> Server::computeInNewThread(Function function,
Awaitable<T> Server::computeInNewThread(net::static_thread_pool& threadPool,
Function function,
SharedCancellationHandle handle) {
// `interruptible` will set the shared state of this promise
// with a function that can be used to cancel the timer.
Expand All @@ -935,48 +1060,35 @@
// this might still block. However it will make the code check the
// cancellation handle while waiting for a thread in the pool to become ready.
return ad_utility::interruptible(
ad_utility::runFunctionOnExecutor(threadPool_.get_executor(),
ad_utility::runFunctionOnExecutor(threadPool.get_executor(),

Check warning on line 1063 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1063

Added line #L1063 was not covered by tests
std::move(inner), net::use_awaitable),
std::move(handle), std::move(cancelTimerPromise));
}

// _____________________________________________________________________________
net::awaitable<std::optional<Server::PlannedQuery>> Server::parseAndPlan(
Server::PlannedQuery Server::parseAndPlan(
const std::string& query, const vector<DatasetClause>& queryDatasets,
QueryExecutionContext& qec, SharedCancellationHandle handle,
TimeLimit timeLimit) {
auto handleCopy = handle;

// The usage of an `optional` here is required because of a limitation in
// Boost::Asio which forces us to use default-constructible result types with
// `computeInNewThread`. We also can't unwrap the optional directly in this
// function, because then the conan build fails in a very strange way,
// probably related to issues in GCC's coroutine implementation.
return computeInNewThread(
[&query, &qec, enablePatternTrick = enablePatternTrick_,
handle = std::move(handle), timeLimit,
&queryDatasets]() mutable -> std::optional<PlannedQuery> {
auto pq = SparqlParser::parseQuery(query);
handle->throwIfCancelled();
// SPARQL Protocol 2.1.4 specifies that the dataset from the query
// parameters overrides the dataset from the query itself.
if (!queryDatasets.empty()) {
pq.datasetClauses_ =
parsedQuery::DatasetClauses::fromClauses(queryDatasets);
}
QueryPlanner qp(&qec, handle);
qp.setEnablePatternTrick(enablePatternTrick);
auto qet = qp.createExecutionTree(pq);
handle->throwIfCancelled();
PlannedQuery plannedQuery{std::move(pq), std::move(qet)};

plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetCancellationHandle(std::move(handle));
plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetTimeConstraint(timeLimit);
return plannedQuery;
},
std::move(handleCopy));
TimeLimit timeLimit) const {
auto pq = SparqlParser::parseQuery(query);
handle->throwIfCancelled();

Check warning on line 1074 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1072-L1074

Added lines #L1072 - L1074 were not covered by tests
// SPARQL Protocol 2.1.4 specifies that the dataset from the query
// parameters overrides the dataset from the query itself.
if (!queryDatasets.empty()) {
pq.datasetClauses_ =
parsedQuery::DatasetClauses::fromClauses(queryDatasets);
}
QueryPlanner qp(&qec, handle);
qp.setEnablePatternTrick(enablePatternTrick_);
auto qet = qp.createExecutionTree(pq);
handle->throwIfCancelled();
PlannedQuery plannedQuery{std::move(pq), std::move(qet)};

Check warning on line 1085 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1078-L1085

Added lines #L1078 - L1085 were not covered by tests

plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetCancellationHandle(std::move(handle));
plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetTimeConstraint(timeLimit);
return plannedQuery;

Check warning on line 1091 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1087-L1091

Added lines #L1087 - L1091 were not covered by tests
}

// _____________________________________________________________________________
Expand Down
Loading
Loading