Skip to content

Commit

Permalink
Use DeltaTriplesManager to process update request (#1608)
Browse files Browse the repository at this point in the history
Add the missing code in `Server.cpp` and `Server.h` to process a parsed update request using the `DeltaTriplesManager` from #1603 .
As of this commit, QLever has an initial beta support for a subset of SPARQL UPDATE with the following limitations (all of which will be added in the future, and the list is far from being complete):
1. Updates are not yet persistent, so a restart or crash of the QLever server will delete all updates.
2. Only a single update request per query is allowed, not the syntax that atomically chains an arbitrary sequence of updates.
3. Only INSERT and DELETE queries are supported, the support for queries that drop or add a complete graph will be added in the future.
  • Loading branch information
Qup42 authored Nov 18, 2024
1 parent daf2c30 commit 97d5037
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 90 deletions.
226 changes: 169 additions & 57 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string>
#include <vector>

#include "engine/ExecuteUpdate.h"
#include "engine/ExportQueryExecutionTrees.h"
#include "engine/QueryPlanner.h"
#include "global/RuntimeParameters.h"
Expand Down Expand Up @@ -43,7 +44,7 @@ Server::Server(unsigned short port, size_t numThreads,
enablePatternTrick_(usePatternTrick),
// The number of server threads currently also is the number of queries
// that can be processed simultaneously.
threadPool_{numThreads} {
queryThreadPool_{numThreads} {
// This also directly triggers the update functions and propagates the
// values of the parameters to the cache.
RuntimeParameters().setOnUpdateAction<"cache-max-num-entries">(
Expand Down Expand Up @@ -394,6 +395,25 @@ Awaitable<void> Server::process(
logCommand(cmd, "clear cache completely (including unpinned elements)");
cache_.clearAll();
response = createJsonResponse(composeCacheStatsJson(), request);
} else if (auto cmd = checkParameter("cmd", "clear-delta-triples")) {
requireValidAccessToken("clear-delta-triples");
logCommand(cmd, "clear delta triples");
// The function requires a SharedCancellationHandle, but the operation is
// not cancellable.
auto handle = std::make_shared<ad_utility::CancellationHandle<>>();
// We don't directly `co_await` because of lifetime issues (bugs) in the
// Conan setup.
auto coroutine = computeInNewThread(
updateThreadPool_,
[this] {
// Use `this` explicitly to silence false-positive errors on the
// captured `this` being unused.
this->index_.deltaTriplesManager().clear();
},
handle);
co_await std::move(coroutine);
response = createOkResponse("Delta triples have been cleared", request,
MediaType::textPlain);
} else if (auto cmd = checkParameter("cmd", "get-settings")) {
logCommand(cmd, "get server settings");
response = createJsonResponse(RuntimeParameters().toMap(), request);
Expand Down Expand Up @@ -470,9 +490,10 @@ Awaitable<void> Server::process(
};
auto visitUpdate =
[&checkParameter, &accessTokenOk, &request, &send, &parameters,
&requestTimer,
this](const ad_utility::url_parser::sparqlOperation::Update& update)
&requestTimer, this, &requireValidAccessToken](
const ad_utility::url_parser::sparqlOperation::Update& update)
-> Awaitable<void> {
requireValidAccessToken("SPARQL Update");
if (auto timeLimit = co_await verifyUserSubmittedQueryTimeout(
checkParameter("timeout", std::nullopt), accessTokenOk, request,
send)) {
Expand Down Expand Up @@ -521,17 +542,17 @@ std::pair<bool, bool> Server::determineResultPinning(
checkParameter(params, "pinresult", "true").has_value();
return {pinSubtrees, pinResult};
}

// ____________________________________________________________________________
Awaitable<Server::PlannedQuery> Server::setupPlannedQuery(
Server::PlannedQuery Server::setupPlannedQuery(
const ad_utility::url_parser::ParamValueMap& params,
const std::string& operation, QueryExecutionContext& qec,
SharedCancellationHandle handle, TimeLimit timeLimit,
const ad_utility::Timer& requestTimer) {
const ad_utility::Timer& requestTimer) const {
auto queryDatasets = ad_utility::url_parser::parseDatasetClauses(params);
std::optional<PlannedQuery> plannedQuery =
co_await parseAndPlan(operation, queryDatasets, qec, handle, timeLimit);
AD_CORRECTNESS_CHECK(plannedQuery.has_value());
auto& qet = plannedQuery.value().queryExecutionTree_;
PlannedQuery plannedQuery =
parseAndPlan(operation, queryDatasets, qec, handle, timeLimit);
auto& qet = plannedQuery.queryExecutionTree_;
qet.isRoot() = true; // allow pinning of the final result
auto timeForQueryPlanning = requestTimer.msecs();
auto& runtimeInfoWholeQuery =
Expand All @@ -541,7 +562,7 @@ Awaitable<Server::PlannedQuery> Server::setupPlannedQuery(
<< " ms" << std::endl;
LOG(TRACE) << qet.getCacheKey() << std::endl;

co_return std::move(plannedQuery.value());
return plannedQuery;
}
// _____________________________________________________________________________
json Server::composeErrorResponseJson(
Expand Down Expand Up @@ -751,6 +772,18 @@ MediaType Server::determineMediaType(
return mediaType.value();
}

// ____________________________________________________________________________
ad_utility::websocket::MessageSender Server::createMessageSender(
const std::weak_ptr<ad_utility::websocket::QueryHub>& queryHub,
const ad_utility::httpUtils::HttpRequest auto& request,
const string& operation) {
auto queryHubLock = queryHub.lock();
AD_CORRECTNESS_CHECK(queryHubLock);
ad_utility::websocket::MessageSender messageSender{
getQueryId(request, operation), *queryHubLock};
return messageSender;
}

// ____________________________________________________________________________
Awaitable<void> Server::processQuery(
const ad_utility::url_parser::ParamValueMap& params, const string& query,
Expand All @@ -761,11 +794,8 @@ Awaitable<void> Server::processQuery(
LOG(INFO) << "Requested media type of result is \""
<< ad_utility::toString(mediaType) << "\"" << std::endl;

auto queryHub = queryHub_.lock();
AD_CORRECTNESS_CHECK(queryHub);
ad_utility::websocket::MessageSender messageSender{getQueryId(request, query),
*queryHub};

ad_utility::websocket::MessageSender messageSender =
createMessageSender(queryHub_, request, query);
auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

Expand All @@ -779,15 +809,35 @@ Awaitable<void> Server::processQuery(
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
auto plannedQuery = co_await setupPlannedQuery(
params, query, qec, cancellationHandle, timeLimit, requestTimer);

// The usage of an `optional` here is required because of a limitation in
// Boost::Asio which forces us to use default-constructible result types with
// `computeInNewThread`. We also can't unwrap the optional directly in this
// function, because then the conan build fails in a very strange way,
// probably related to issues in GCC's coroutine implementation.
// For the same reason (crashes in the conanbuild) we store the coroutine in
// an explicit variable instead of directly `co_await`-ing it.
auto coroutine = computeInNewThread(
queryThreadPool_,
[this, &params, &query, &qec, cancellationHandle, &timeLimit,
&requestTimer]() -> std::optional<PlannedQuery> {
return setupPlannedQuery(params, query, qec, cancellationHandle,
timeLimit, requestTimer);
},
cancellationHandle);
auto plannedQueryOpt = co_await std::move(coroutine);
AD_CORRECTNESS_CHECK(plannedQueryOpt.has_value());
auto plannedQuery = std::move(plannedQueryOpt).value();
auto qet = plannedQuery.queryExecutionTree_;

if (plannedQuery.parsedQuery_.hasUpdateClause()) {
// This may be caused by a bug (the code is not yet tested well) or by an
// attack which tries to circumvent (not yet existing) access controls for
// Update.
throw std::runtime_error("Expected normal query but received update query");
throw std::runtime_error(
absl::StrCat("SPARQL QUERY was request via the HTTP request, but the "
"following update was sent instead of an update: ",
plannedQuery.parsedQuery_._originalString));
}

// Read the export limit from the send` parameter (historical name). This
Expand Down Expand Up @@ -833,6 +883,80 @@ Awaitable<void> Server::processQuery(
co_return;
}

// ____________________________________________________________________________
void Server::processUpdateImpl(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender,
ad_utility::SharedCancellationHandle cancellationHandle,
DeltaTriples& deltaTriples) {
auto [pinSubtrees, pinResult] = determineResultPinning(params);
LOG(INFO) << "Processing the following SPARQL update:"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< update << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
auto plannedQuery = setupPlannedQuery(params, update, qec, cancellationHandle,
timeLimit, requestTimer);
auto qet = plannedQuery.queryExecutionTree_;

if (!plannedQuery.parsedQuery_.hasUpdateClause()) {
throw std::runtime_error(
absl::StrCat("SPARQL UPDATE was request via the HTTP request, but the "
"following query was sent instead of an update: ",
plannedQuery.parsedQuery_._originalString));
}
ExecuteUpdate::executeUpdate(index_, plannedQuery.parsedQuery_, qet,
deltaTriples, cancellationHandle);

LOG(INFO) << "Done processing update"
<< ", total time was " << requestTimer.msecs().count() << " ms"
<< std::endl;
LOG(DEBUG) << "Runtime Info:\n"
<< qet.getRootOperation()->runtimeInfo().toString() << std::endl;
}

// ____________________________________________________________________________
Awaitable<void> Server::processUpdate(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
auto messageSender = createMessageSender(queryHub_, request, update);

auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

// Update the delta triples.

// Note: We don't directly `co_await` because of lifetime issues (probably
// bugs in GCC or Boost) that occur in the Conan build.
auto coroutine = computeInNewThread(
updateThreadPool_,
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle] {
index_.deltaTriplesManager().modify(
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle](auto& deltaTriples) {
// Use `this` explicitly to silence false-positive errors on
// captured `this` being unused.
this->processUpdateImpl(params, update, requestTimer, timeLimit,
messageSender, cancellationHandle,
deltaTriples);
});
},
cancellationHandle);
co_await std::move(coroutine);

// TODO<qup42> send a proper response
// SPARQL 1.1 Protocol 2.2.4 Successful Responses: "The response body of a
// successful update request is implementation defined."
co_await send(ad_utility::httpUtils::createOkResponse(
"Update successful", request, MediaType::textPlain));
co_return;
}

// ____________________________________________________________________________
template <Server::OperationType type>
Awaitable<void> Server::processQueryOrUpdate(
Expand All @@ -858,8 +982,8 @@ Awaitable<void> Server::processQueryOrUpdate(
co_await processQuery(params, queryOrUpdate, requestTimer, request, send,
timeLimit);
} else {
throw std::runtime_error(
"SPARQL 1.1 Update is currently not supported by QLever.");
co_await processUpdate(params, queryOrUpdate, requestTimer, request, send,
timeLimit);
}
} catch (const ParseException& e) {
responseStatus = http::status::bad_request;
Expand Down Expand Up @@ -894,7 +1018,7 @@ Awaitable<void> Server::processQueryOrUpdate(
exceptionErrorMsg.value().append(absl::StrCat(
" Highlighting an error for the command line log failed: ",
e.what()));
LOG(ERROR) << "Failed to highlight error in query. " << e.what()
LOG(ERROR) << "Failed to highlight error in operation. " << e.what()
<< std::endl;
LOG(ERROR) << metadata.value().query_ << std::endl;
}
Expand All @@ -915,7 +1039,8 @@ Awaitable<void> Server::processQueryOrUpdate(

// _____________________________________________________________________________
template <std::invocable Function, typename T>
Awaitable<T> Server::computeInNewThread(Function function,
Awaitable<T> Server::computeInNewThread(net::static_thread_pool& threadPool,
Function function,
SharedCancellationHandle handle) {
// `interruptible` will set the shared state of this promise
// with a function that can be used to cancel the timer.
Expand All @@ -935,48 +1060,35 @@ Awaitable<T> Server::computeInNewThread(Function function,
// this might still block. However it will make the code check the
// cancellation handle while waiting for a thread in the pool to become ready.
return ad_utility::interruptible(
ad_utility::runFunctionOnExecutor(threadPool_.get_executor(),
ad_utility::runFunctionOnExecutor(threadPool.get_executor(),
std::move(inner), net::use_awaitable),
std::move(handle), std::move(cancelTimerPromise));
}

// _____________________________________________________________________________
net::awaitable<std::optional<Server::PlannedQuery>> Server::parseAndPlan(
Server::PlannedQuery Server::parseAndPlan(
const std::string& query, const vector<DatasetClause>& queryDatasets,
QueryExecutionContext& qec, SharedCancellationHandle handle,
TimeLimit timeLimit) {
auto handleCopy = handle;

// The usage of an `optional` here is required because of a limitation in
// Boost::Asio which forces us to use default-constructible result types with
// `computeInNewThread`. We also can't unwrap the optional directly in this
// function, because then the conan build fails in a very strange way,
// probably related to issues in GCC's coroutine implementation.
return computeInNewThread(
[&query, &qec, enablePatternTrick = enablePatternTrick_,
handle = std::move(handle), timeLimit,
&queryDatasets]() mutable -> std::optional<PlannedQuery> {
auto pq = SparqlParser::parseQuery(query);
handle->throwIfCancelled();
// SPARQL Protocol 2.1.4 specifies that the dataset from the query
// parameters overrides the dataset from the query itself.
if (!queryDatasets.empty()) {
pq.datasetClauses_ =
parsedQuery::DatasetClauses::fromClauses(queryDatasets);
}
QueryPlanner qp(&qec, handle);
qp.setEnablePatternTrick(enablePatternTrick);
auto qet = qp.createExecutionTree(pq);
handle->throwIfCancelled();
PlannedQuery plannedQuery{std::move(pq), std::move(qet)};

plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetCancellationHandle(std::move(handle));
plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetTimeConstraint(timeLimit);
return plannedQuery;
},
std::move(handleCopy));
TimeLimit timeLimit) const {
auto pq = SparqlParser::parseQuery(query);
handle->throwIfCancelled();
// SPARQL Protocol 2.1.4 specifies that the dataset from the query
// parameters overrides the dataset from the query itself.
if (!queryDatasets.empty()) {
pq.datasetClauses_ =
parsedQuery::DatasetClauses::fromClauses(queryDatasets);
}
QueryPlanner qp(&qec, handle);
qp.setEnablePatternTrick(enablePatternTrick_);
auto qet = qp.createExecutionTree(pq);
handle->throwIfCancelled();
PlannedQuery plannedQuery{std::move(pq), std::move(qet)};

plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetCancellationHandle(std::move(handle));
plannedQuery.queryExecutionTree_.getRootOperation()
->recursivelySetTimeConstraint(timeLimit);
return plannedQuery;
}

// _____________________________________________________________________________
Expand Down
Loading

0 comments on commit 97d5037

Please sign in to comment.