Skip to content

Commit

Permalink
Lazy import for SERVICE (#1491)
Browse files Browse the repository at this point in the history
Integrate the `LazyJsonParser` introduced in #1412 into the `SERVICE` Operation, which will help to reduce RAM usage for the import of large results. In particular, the (possibly large) JSON result of a SERVICE will not be fully materialized, but converted to a (possibly much smaller) `IdTable` on the fly. This is a preparation for making the SERVICE operation completely lazy.
  • Loading branch information
UNEXENU authored Sep 18, 2024
1 parent 8a978f4 commit b70df93
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 114 deletions.
213 changes: 145 additions & 68 deletions src/engine/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,17 @@ ProtoResult Service::computeResultImpl([[maybe_unused]] bool requestLaziness) {
serviceQuery, "application/sparql-query",
"application/sparql-results+json");

std::basic_string<char, std::char_traits<char>,
ad_utility::AllocatorWithLimit<char>>
jsonStr(_executionContext->getAllocator());
for (std::span<std::byte> bytes : response.body_) {
jsonStr.append(reinterpret_cast<const char*>(bytes.data()), bytes.size());
checkCancellation();
}

auto throwErrorWithContext = [&jsonStr, &serviceUrl](std::string_view sv) {
throw std::runtime_error(absl::StrCat(
"Error while executing a SERVICE request to <", serviceUrl.asString(),
">: ", sv, ". First 100 bytes of the response: ",
std::string_view{jsonStr.data()}.substr(0, 100)));
auto throwErrorWithContext = [this, &response](std::string_view sv) {
std::string ctx;
ctx.reserve(100);
for (const auto& bytes : std::move(response.body_)) {
ctx += std::string(reinterpret_cast<const char*>(bytes.data()),
bytes.size());
if (ctx.size() >= 100) {
break;
}
}
this->throwErrorWithContext(sv, std::string_view(ctx).substr(0, 100));
};

// Verify status and content-type of the response.
Expand All @@ -173,71 +171,95 @@ ProtoResult Service::computeResultImpl([[maybe_unused]] bool requestLaziness) {
response.contentType_, "'"));
}

// Parse the received result.
std::vector<std::string> resVariables;
std::vector<nlohmann::json> resBindings;
try {
auto jsonResult = nlohmann::json::parse(jsonStr);

if (jsonResult.empty()) {
throwErrorWithContext("Response from SPARQL endpoint is empty");
}

resVariables = jsonResult["head"]["vars"].get<std::vector<std::string>>();
resBindings =
jsonResult["results"]["bindings"].get<std::vector<nlohmann::json>>();
} catch (const nlohmann::json::parse_error&) {
throwErrorWithContext("Failed to parse the SERVICE result as JSON");
} catch (const nlohmann::json::type_error&) {
throwErrorWithContext("JSON result does not have the expected structure");
}

// Check if result header row is expected.
std::string headerRow = absl::StrCat("?", absl::StrJoin(resVariables, " ?"));
std::string expectedHeaderRow = absl::StrJoin(
parsedServiceClause_.visibleVariables_, " ", Variable::AbslFormatter);
if (headerRow != expectedHeaderRow) {
throwErrorWithContext(absl::StrCat(
"Header row of JSON result for SERVICE query is \"", headerRow,
"\", but expected \"", expectedHeaderRow, "\""));
}
// Prepare the expected Variables as keys for the JSON-bindings. We can't wait
// for the variables sent in the response as they're maybe not read before
// the bindings.
std::vector<std::string> expVariableKeys;
std::ranges::transform(parsedServiceClause_.visibleVariables_,
std::back_inserter(expVariableKeys),
[](const Variable& v) { return v.name().substr(1); });

// Set basic properties of the result table.
IdTable idTable{getResultWidth(), getExecutionContext()->getAllocator()};
LocalVocab localVocab{};

auto body = ad_utility::LazyJsonParser::parse(std::move(response.body_),
{"results", "bindings"});

// Fill the result table using the `writeJsonResult` method below.
size_t resWidth = getResultWidth();
CALL_FIXED_SIZE(resWidth, &Service::writeJsonResult, this, resVariables,
resBindings, &idTable, &localVocab);
CALL_FIXED_SIZE(getResultWidth(), &Service::writeJsonResult, this,
expVariableKeys, body, &idTable, &localVocab);

return {std::move(idTable), resultSortedOn(), std::move(localVocab)};
}

// ____________________________________________________________________________
template <size_t I>
void Service::writeJsonResult(const std::vector<std::string>& vars,
const std::vector<nlohmann::json>& bindings,
ad_utility::LazyJsonParser::Generator& body,
IdTable* idTablePtr, LocalVocab* localVocab) {
IdTableStatic<I> idTable = std::move(*idTablePtr).toStatic<I>();
checkCancellation();
std::vector<size_t> numLocalVocabPerColumn(idTable.numColumns());

auto writeBindings = [&](const nlohmann::json& bindings, size_t& rowIdx) {
for (const auto& binding : bindings) {
idTable.emplace_back();
for (size_t colIdx = 0; colIdx < vars.size(); ++colIdx) {
TripleComponent tc =
binding.contains(vars[colIdx])
? bindingToTripleComponent(binding[vars[colIdx]])
: TripleComponent::UNDEF();

Id id = std::move(tc).toValueId(getIndex().getVocab(), *localVocab);
idTable(rowIdx, colIdx) = id;
if (id.getDatatype() == Datatype::LocalVocabIndex) {
++numLocalVocabPerColumn[colIdx];
}
}
rowIdx++;
checkCancellation();
}
};

size_t rowIdx = 0;
for (const auto& binding : bindings) {
idTable.emplace_back();
for (size_t colIdx = 0; colIdx < vars.size(); ++colIdx) {
TripleComponent tc = binding.contains(vars[colIdx])
? bindingToTripleComponent(binding[vars[colIdx]])
: TripleComponent::UNDEF();

Id id = std::move(tc).toValueId(getIndex().getVocab(), *localVocab);
idTable(rowIdx, colIdx) = id;
if (id.getDatatype() == Datatype::LocalVocabIndex) {
++numLocalVocabPerColumn[colIdx];
bool resultExists{false};
bool varsChecked{false};
try {
for (const nlohmann::json& part : body) {
if (part.contains("head")) {
AD_CORRECTNESS_CHECK(!varsChecked);
verifyVariables(part["head"], body.details());
varsChecked = true;
}
// The LazyJsonParser only yields parts containing the "bindings" array,
// therefore we can assume its existence here.
AD_CORRECTNESS_CHECK(part.contains("results") &&
part["results"].contains("bindings") &&
part["results"]["bindings"].is_array());
writeBindings(part["results"]["bindings"], rowIdx);
resultExists = true;
}
rowIdx++;
checkCancellation();
} catch (const ad_utility::LazyJsonParser::Error& e) {
throwErrorWithContext(
absl::StrCat("Parser failed with error: '", e.what(), "'"),
body.details().first100_, body.details().last100_);
}

// As the LazyJsonParser only passes parts of the result that match the
// expected structure, no result implies an unexpected structure.
if (!resultExists) {
throwErrorWithContext(
"JSON result does not have the expected structure (results section "
"missing)",
body.details().first100_, body.details().last100_);
}

if (!varsChecked) {
throwErrorWithContext(
"JSON result does not have the expected structure (head section "
"missing)",
body.details().first100_, body.details().last100_);
}

AD_CORRECTNESS_CHECK(rowIdx == idTable.size());
Expand Down Expand Up @@ -309,24 +331,27 @@ std::optional<std::string> Service::getSiblingValuesClause() const {
}

// ____________________________________________________________________________
TripleComponent Service::bindingToTripleComponent(const nlohmann::json& cell) {
if (!cell.contains("type") || !cell.contains("value")) {
throw std::runtime_error("Missing type or value field in binding.");
TripleComponent Service::bindingToTripleComponent(
const nlohmann::json& binding) {
if (!binding.contains("type") || !binding.contains("value")) {
throw std::runtime_error(absl::StrCat(
"Missing type or value field in binding. The binding is: '",
binding.dump(), "'"));
}

const auto type = cell["type"].get<std::string_view>();
const auto value = cell["value"].get<std::string_view>();
const auto type = binding["type"].get<std::string_view>();
const auto value = binding["value"].get<std::string_view>();

TripleComponent tc;
if (type == "literal") {
if (cell.contains("datatype")) {
if (binding.contains("datatype")) {
tc = TurtleParser<TokenizerCtre>::literalAndDatatypeToTripleComponent(
value, TripleComponent::Iri::fromIrirefWithoutBrackets(
cell["datatype"].get<std::string_view>()));
} else if (cell.contains("xml:lang")) {
binding["datatype"].get<std::string_view>()));
} else if (binding.contains("xml:lang")) {
tc = TripleComponent::Literal::literalWithNormalizedContent(
asNormalizedStringViewUnsafe(value),
cell["xml:lang"].get<std::string>());
binding["xml:lang"].get<std::string>());
} else {
tc = TripleComponent::Literal::literalWithNormalizedContent(
asNormalizedStringViewUnsafe(value));
Expand All @@ -339,7 +364,9 @@ TripleComponent Service::bindingToTripleComponent(const nlohmann::json& cell) {
"For now, consider filtering them out using the ISBLANK function or "
"converting them via the STR function.");
} else {
throw std::runtime_error(absl::StrCat("Type ", type, " is undefined."));
throw std::runtime_error(absl::StrCat("Type ", type,
" is undefined. The binding is: '",
binding.dump(), "'"));
}
return tc;
}
Expand All @@ -353,3 +380,53 @@ ProtoResult Service::makeNeutralElementResultForSilentFail() const {
}
return {std::move(idTable), resultSortedOn(), LocalVocab{}};
}

// ____________________________________________________________________________
void Service::verifyVariables(
const nlohmann::json& head,
const ad_utility::LazyJsonParser::Details& details) const {
std::vector<std::string> vars;
try {
vars = head.at("vars").get<std::vector<std::string>>();
} catch (...) {
throw std::runtime_error(
absl::StrCat("JSON result does not have the expected structure, as its "
"\"head\" section is not according to the SPARQL "
"standard. The \"head\" section is: '",
head.dump(), "'."));
}

ad_utility::HashSet<Variable> responseVars;
for (const auto& v : vars) {
responseVars.emplace("?" + v);
}
ad_utility::HashSet<Variable> expectedVars(
parsedServiceClause_.visibleVariables_.begin(),
parsedServiceClause_.visibleVariables_.end());

if (responseVars != expectedVars) {
throwErrorWithContext(
absl::StrCat("Header row of JSON result for SERVICE query is \"",
absl::StrCat("?", absl::StrJoin(vars, " ?")),
"\", but expected \"",
absl::StrJoin(parsedServiceClause_.visibleVariables_, " ",
Variable::AbslFormatter),
"\". Probable cause: The remote endpoint sent a JSON "
"response that is not according to the SPARQL Standard"),
details.first100_, details.last100_);
}
}

// ____________________________________________________________________________
void Service::throwErrorWithContext(std::string_view msg,
std::string_view first100,
std::string_view last100) const {
const ad_utility::httpUtils::Url serviceUrl{
asStringViewUnsafe(parsedServiceClause_.serviceIri_.getContent())};

throw std::runtime_error(absl::StrCat(
"Error while executing a SERVICE request to <", serviceUrl.asString(),
">: ", msg, ". First 100 bytes of the response: '", first100,
(last100.empty() ? "'"
: absl::StrCat(", last 100 bytes: '", last100, "'"))));
}
17 changes: 15 additions & 2 deletions src/engine/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "engine/Operation.h"
#include "engine/Values.h"
#include "parser/ParsedQuery.h"
#include "util/LazyJsonParser.h"
#include "util/http/HttpClient.h"

// The SERVICE operation. Sends a query to the remote endpoint specified by the
Expand Down Expand Up @@ -98,7 +99,8 @@ class Service : public Operation {
vector<QueryExecutionTree*> getChildren() override { return {}; }

// Convert the given binding to TripleComponent.
static TripleComponent bindingToTripleComponent(const nlohmann::json& cell);
static TripleComponent bindingToTripleComponent(
const nlohmann::json& binding);

private:
// The string returned by this function is used as cache key.
Expand All @@ -116,13 +118,24 @@ class Service : public Operation {
// Create result for silent fail.
ProtoResult makeNeutralElementResultForSilentFail() const;

// Check that all visible variables of the SERVICE clause exist in the json
// object, otherwise throw an error.
void verifyVariables(const nlohmann::json& head,
const ad_utility::LazyJsonParser::Details& gen) const;

// Throws an error message, providing the first 100 bytes of the result as
// context.
[[noreturn]] void throwErrorWithContext(
std::string_view msg, std::string_view first100,
std::string_view last100 = ""sv) const;

// Write the given JSON result to the given result object. The `I` is the
// width of the result table.
//
// NOTE: This is similar to `Values::writeValues`, except that we have to
// parse JSON here and not a VALUES clause.
template <size_t I>
void writeJsonResult(const std::vector<std::string>& vars,
const std::vector<nlohmann::json>& bindings,
ad_utility::LazyJsonParser::Generator& response,
IdTable* idTable, LocalVocab* localVocab);
};
36 changes: 32 additions & 4 deletions src/util/LazyJsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@
namespace ad_utility {

// ____________________________________________________________________________
cppcoro::generator<nlohmann::json> LazyJsonParser::parse(
cppcoro::generator<std::string> partialJson,
LazyJsonParser::Generator LazyJsonParser::parse(
cppcoro::generator<std::string_view> partialJson,
std::vector<std::string> arrayPath) {
LazyJsonParser p(std::move(arrayPath));
Details& details = co_await cppcoro::getDetails;
for (const auto& chunk : partialJson) {
if (details.first100_.size() < 100) {
details.first100_ += chunk.substr(0, 100);
}
details.last100_ =
chunk.substr(std::max(0, static_cast<int>(chunk.size() - 100)), 100);

if (auto res = p.parseChunk(chunk); res.has_value()) {
co_yield res;
if (p.endReached_) {
Expand All @@ -29,6 +36,21 @@ cppcoro::generator<nlohmann::json> LazyJsonParser::parse(
}
}

// ____________________________________________________________________________
LazyJsonParser::Generator LazyJsonParser::parse(
cppcoro::generator<std::span<std::byte>> partialJson,
std::vector<std::string> arrayPath) {
return parse(
[](cppcoro::generator<std::span<std::byte>> partialJson)
-> cppcoro::generator<std::string_view> {
for (const auto& bytes : partialJson) {
co_yield std::string_view(reinterpret_cast<const char*>(bytes.data()),
bytes.size());
}
}(std::move(partialJson)),
std::move(arrayPath));
}

// ____________________________________________________________________________
LazyJsonParser::LazyJsonParser(std::vector<std::string> arrayPath)
: arrayPath_(std::move(arrayPath)),
Expand Down Expand Up @@ -230,7 +252,9 @@ std::optional<nlohmann::json> LazyJsonParser::constructResultFromParsedChunk(
size_t nextChunkStart =
materializeEnd == 0 ? 0 : std::min(materializeEnd + 1, input_.size());
if (input_.size() - nextChunkStart >= 1'000'000) {
throw std::runtime_error("Ill formed Json.");
throw Error(
"QLever currently doesn't support SERVICE results where a single "
"result row is larger than 1MB");
}
if (nextChunkStart == 0) {
return std::nullopt;
Expand All @@ -255,7 +279,11 @@ std::optional<nlohmann::json> LazyJsonParser::constructResultFromParsedChunk(
absl::StrAppend(&resStr, suffixInArray_);
}

return nlohmann::json::parse(resStr);
try {
return nlohmann::json::parse(resStr);
} catch (const nlohmann::json::parse_error& e) {
throw Error(e.what());
}
}

// ____________________________________________________________________________
Expand Down
Loading

0 comments on commit b70df93

Please sign in to comment.