Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy join #1524

Merged
merged 34 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f1a67d4
Implement lazy join
RobinTF Nov 18, 2024
711c3a7
Properly reformat
RobinTF Nov 18, 2024
08be2d8
Fix race condition
RobinTF Nov 18, 2024
c9a5648
Fix lifetime issues and add documentation
RobinTF Nov 18, 2024
bc659c6
Properly reset runtime parameter after test
RobinTF Nov 19, 2024
112a757
Properly use `LocalVocab`s from fully materialized results
RobinTF Nov 19, 2024
d091bba
Properly apply permutation
RobinTF Nov 19, 2024
f585987
Spawn thread only if generator is consumed
RobinTF Nov 20, 2024
8ecc821
Fix issue with join aggregation
RobinTF Nov 20, 2024
190b8de
Fix typo
RobinTF Nov 20, 2024
3bbc466
Unify two generators
RobinTF Nov 20, 2024
3e6561c
Fix deadlock
RobinTF Nov 20, 2024
ec02b9f
Use robuster approach for thread safety
RobinTF Nov 20, 2024
cbc6252
Add test for operation failure propagation
RobinTF Nov 20, 2024
f56a00b
Merge remote-tracking branch 'origin/master' into lazy-join
Nov 20, 2024
1f54327
Show num-local-vocabs in runtime information
Nov 20, 2024
99ea8ee
Merge branch 'lazy-join' of github.com:RobinTF/qlever into lazy-join
Nov 20, 2024
1d5dcda
Fix problem from previous conflict resolution
Nov 20, 2024
b2b944f
Add unit test to ensure correct permutations
RobinTF Nov 20, 2024
1b55f45
Avoid out-of-line definition
RobinTF Nov 20, 2024
b9b7cae
Address PR comments
RobinTF Nov 21, 2024
0f4f859
Address simple PR comments
RobinTF Nov 22, 2024
62d9295
Allow more ranges to be used in `LocalVocab`
RobinTF Nov 22, 2024
c7889a6
Merge missing vocabs
RobinTF Nov 22, 2024
f1c886a
Merge remote-tracking branch 'ad-freiburg/master' into lazy-join
RobinTF Nov 22, 2024
9e357cd
Revert "Allow more ranges to be used in `LocalVocab`"
RobinTF Nov 22, 2024
711212a
Fix compilation
RobinTF Nov 22, 2024
4b51d1d
Address minor PR comments
RobinTF Nov 24, 2024
8e85cc7
Fix issues with vocab and add unit tests
RobinTF Nov 24, 2024
a8e3240
Add another unit test
RobinTF Nov 24, 2024
3f817fa
Fix typo
RobinTF Nov 24, 2024
ad0128a
Replace else case with assertion
RobinTF Nov 24, 2024
f88d4b8
Check empty table instead of vocab
RobinTF Nov 25, 2024
66cb836
Fix unit test
RobinTF Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 127 additions & 140 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>> convertGenerator(
co_yield t;
}
}
joka921 marked this conversation as resolved.
Show resolved Hide resolved

// Wrap a fully materialized result in a `IdTableAndFirstCol` and an array.
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1> asSingleTableView(
const Result& result, const std::vector<ColumnIndex>& permutation) {
return std::array{ad_utility::IdTableAndFirstCol{
result.idTable().asColumnSubsetView(permutation),
result.getCopyOfLocalVocab()}};
}

// Wrap a result either in an array with a single element or in a range wrapping
// the lazy result generator.
std::variant<decltype(convertGenerator(std::declval<Result::Generator>())),
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1>>
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
resultToRange(const Result& result,
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
const std::vector<ColumnIndex>& permutation) {
if (result.isFullyMaterialized()) {
return asSingleTableView(result, permutation);
}
return convertGenerator(std::move(result.idTables()), permutation);
}
} // namespace

// _____________________________________________________________________________
Expand Down Expand Up @@ -114,8 +134,7 @@ ProtoResult Join::computeResult(bool requestLaziness) {
if (_left->knownEmptyResult() || _right->knownEmptyResult()) {
_left->getRootOperation()->updateRuntimeInformationWhenOptimizedOut();
_right->getRootOperation()->updateRuntimeInformationWhenOptimizedOut();
return {IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
resultSortedOn(), LocalVocab()};
return createEmptyResult();
}

// Always materialize results that meet one of the following criteria:
Expand Down Expand Up @@ -165,11 +184,7 @@ ProtoResult Join::computeResult(bool requestLaziness) {
checkCancellation();
if (leftRes->isFullyMaterialized() && leftRes->idTable().empty()) {
_right->getRootOperation()->updateRuntimeInformationWhenOptimizedOut();
// TODO<joka921, hannahbast, SPARQL update> When we add triples to the
// index, the vocabularies of index scans will not necessarily be empty and
// we need a mechanism to still retrieve them when using the lazy scan.
return {IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
resultSortedOn(), LocalVocab()};
return createEmptyResult();
}

// Note: If only one of the children is a scan, then we have made sure in the
Expand All @@ -186,15 +201,8 @@ ProtoResult Join::computeResult(bool requestLaziness) {
rightResIfCached ? rightResIfCached : _right->getResult(true);
checkCancellation();
if (leftRes->isFullyMaterialized() && rightRes->isFullyMaterialized()) {
IdTable idTable{getResultWidth(), getExecutionContext()->getAllocator()};
join(leftRes->idTable(), _leftJoinCol, rightRes->idTable(), _rightJoinCol,
&idTable);
checkCancellation();

// If only one of the two operands has a non-empty local vocabulary, share
// with that one (otherwise, throws an exception).
return {std::move(idTable), resultSortedOn(),
Result::getMergedLocalVocab(*leftRes, *rightRes)};
return computeResultForTwoMaterializedInputs(std::move(leftRes),
std::move(rightRes));
}
return lazyJoin(std::move(leftRes), _leftJoinCol, std::move(rightRes),
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
_rightJoinCol, requestLaziness);
Expand Down Expand Up @@ -409,6 +417,65 @@ void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b,
<< ", size = " << result->size() << "\n";
}

// _____________________________________________________________________________
Result::Generator Join::yieldOnCallbackCalled(
ad_utility::InvocableWithExactReturnType<
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const {
std::atomic_flag write = true;
std::variant<std::monostate, Result::IdTableVocabPair, std::exception_ptr>
storage;
ad_utility::JThread thread{[&write, &storage, &action, &permutation]() {
auto writeValue = [&write, &storage](auto value) noexcept {
storage = std::move(value);
write.clear();
write.notify_one();
};
auto writeValueAndWait = [&permutation, &write,
&writeValue](Result::IdTableVocabPair value) {
AD_CORRECTNESS_CHECK(write.test());
if (!permutation.empty()) {
value.idTable_.setColumnSubset(permutation);
}
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
writeValue(std::move(value));
// Wait until we are allowed to write again.
write.wait(false);
};
auto addValue = [&writeValueAndWait](IdTable& idTable,
LocalVocab& localVocab) {
if (idTable.size() < CHUNK_SIZE) {
return;
}
writeValueAndWait(
Result::IdTableVocabPair{std::move(idTable), std::move(localVocab)});
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
};
try {
auto finalValue = action(addValue);
if (!finalValue.idTable_.empty()) {
writeValueAndWait(std::move(finalValue));
}
writeValue(std::monostate{});
} catch (...) {
writeValue(std::current_exception());
}
}};
while (true) {
// Wait for read phase.
write.wait(true);
if (std::holds_alternative<std::monostate>(storage)) {
break;
}
if (std::holds_alternative<std::exception_ptr>(storage)) {
std::rethrow_exception(std::get<std::exception_ptr>(storage));
}
co_yield std::get<Result::IdTableVocabPair>(storage);
// Initiate write phase.
write.test_and_set();
write.notify_one();
}
}

// _____________________________________________________________________________
ProtoResult Join::createResult(
bool requestedLaziness,
Expand All @@ -417,64 +484,7 @@ ProtoResult Join::createResult(
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const {
if (requestedLaziness) {
return {[](auto innerAction, auto innerPermutation) -> Result::Generator {
std::atomic_flag write = true;
std::variant<std::monostate, Result::IdTableVocabPair,
std::exception_ptr>
storage;
ad_utility::JThread thread{[&write, &storage, &innerAction,
&innerPermutation]() {
auto writeValue = [&write, &storage](auto value) noexcept {
storage = std::move(value);
write.clear();
write.notify_one();
};
auto addValue = [&write, &writeValue, &innerPermutation](
IdTable& idTable, LocalVocab& localVocab) {
AD_CORRECTNESS_CHECK(write.test());
if (idTable.size() < CHUNK_SIZE) {
return;
}
Result::IdTableVocabPair pair{std::move(idTable),
std::move(localVocab)};
if (!innerPermutation.empty()) {
pair.idTable_.setColumnSubset(innerPermutation);
}
writeValue(std::move(pair));
// Wait until we are allowed to write again.
write.wait(false);
};
try {
auto finalValue = innerAction(addValue);
AD_CORRECTNESS_CHECK(write.test());
if (!finalValue.idTable_.empty()) {
if (!innerPermutation.empty()) {
finalValue.idTable_.setColumnSubset(innerPermutation);
}
writeValue(std::move(finalValue));
// Wait until we are allowed to write again.
write.wait(false);
}
writeValue(std::monostate{});
} catch (...) {
writeValue(std::current_exception());
}
}};
while (true) {
// Wait for read phase.
write.wait(true);
if (std::holds_alternative<std::monostate>(storage)) {
break;
}
if (std::holds_alternative<std::exception_ptr>(storage)) {
std::rethrow_exception(std::get<std::exception_ptr>(storage));
}
co_yield std::get<Result::IdTableVocabPair>(storage);
// Initiate write phase.
write.test_and_set();
write.notify_one();
}
}(std::move(action), std::move(permutation)),
return {yieldOnCallbackCalled(std::move(action), std::move(permutation)),
resultSortedOn()};
} else {
auto [idTable, localVocab] = action(ad_utility::noop);
Expand All @@ -485,20 +495,6 @@ ProtoResult Join::createResult(
}
}

// _____________________________________________________________________________
bool Join::couldContainUndef(const auto& blocks, const auto& tree,
ColumnIndex joinColumn) {
if constexpr (std::ranges::random_access_range<decltype(blocks)>) {
AD_CORRECTNESS_CHECK(!std::ranges::empty(blocks));
return !blocks[0].empty() && blocks[0][0].isUndefined();
} else {
auto undefStatus = tree->getVariableAndInfoByColumnIndex(joinColumn)
.second.mightContainUndef_;
return undefStatus ==
ColumnIndexAndTypeInfo::UndefStatus::PossiblyUndefined;
}
}

// ______________________________________________________________________________
ProtoResult Join::lazyJoin(std::shared_ptr<const Result> a, ColumnIndex jc1,
std::shared_ptr<const Result> b, ColumnIndex jc2,
Expand All @@ -513,43 +509,18 @@ ProtoResult Join::lazyJoin(std::shared_ptr<const Result> a, ColumnIndex jc1,
auto resultPermutation = joinColMap.permutationResult();
return createResult(
requestLaziness,
[this, a = std::move(a), jc1, b = std::move(b), jc2,
[this, a = std::move(a), b = std::move(b),
joinColMap = std::move(joinColMap)](
std::function<void(IdTable&, LocalVocab&)> yieldTable) {
ad_utility::AddCombinedRowToIdTable rowAdder{
1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
cancellationHandle_, CHUNK_SIZE, std::move(yieldTable)};
auto asSingleTableView =
[](const Result& result,
const std::vector<ColumnIndex>& permutation) {
return std::array{ad_utility::IdTableAndFirstCol{
result.idTable().asColumnSubsetView(permutation),
result.getCopyOfLocalVocab()}};
};
using Blocks = std::variant<
cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>>,
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1>>;
auto leftRange =
a->isFullyMaterialized()
? Blocks{asSingleTableView(*a, joinColMap.permutationLeft())}
: Blocks{convertGenerator(std::move(a->idTables()),
joinColMap.permutationLeft())};
auto rightRange =
b->isFullyMaterialized()
? Blocks{asSingleTableView(*b, joinColMap.permutationRight())}
: Blocks{convertGenerator(std::move(b->idTables()),
joinColMap.permutationRight())};
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(yieldTable)};
auto leftRange = resultToRange(*a, joinColMap.permutationLeft());
auto rightRange = resultToRange(*b, joinColMap.permutationRight());
std::visit(
[this, &rowAdder, jc1, jc2](auto& leftBlocks, auto& rightBlocks) {
bool containsUndef = couldContainUndef(leftBlocks, _left, jc1) ||
couldContainUndef(rightBlocks, _right, jc2);
if (containsUndef) {
ad_utility::zipperJoinForBlocksWithPotentialUndef(
leftBlocks, rightBlocks, std::less{}, rowAdder);
} else {
ad_utility::zipperJoinForBlocksWithoutUndef(
leftBlocks, rightBlocks, std::less{}, rowAdder);
}
[&rowAdder](auto& leftBlocks, auto& rightBlocks) {
ad_utility::zipperJoinForBlocksWithPotentialUndef(
leftBlocks, rightBlocks, std::less{}, rowAdder);
},
leftRange, rightRange);
auto localVocab = std::move(rowAdder.localVocab());
Expand Down Expand Up @@ -668,7 +639,7 @@ void Join::hashJoin(const IdTable& dynA, ColumnIndex jc1, const IdTable& dynB,
// ___________________________________________________________________________
template <typename ROW_A, typename ROW_B, int TABLE_WIDTH>
void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB,
const ColumnIndex jcRowB,
ColumnIndex jcRowB,
IdTableStatic<TABLE_WIDTH>* table) {
// Add a new, empty row.
const size_t backIndex = table->size();
Expand Down Expand Up @@ -748,8 +719,8 @@ ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const {
// `AddCombinedRowToIdTable` class to work correctly.
AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0);
ad_utility::AddCombinedRowToIdTable rowAdder{
1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
cancellationHandle_, CHUNK_SIZE, std::move(yieldTable)};
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(yieldTable)};

ad_utility::Timer timer{
ad_utility::timer::Timer::InitialStatus::Started};
Expand Down Expand Up @@ -803,8 +774,8 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
std::function<void(IdTable&, LocalVocab&)> yieldTable) {
const IdTable& idTable = resultWithIdTable->idTable();
ad_utility::AddCombinedRowToIdTable rowAdder{
1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
cancellationHandle_, CHUNK_SIZE, std::move(yieldTable)};
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(yieldTable)};

AD_CORRECTNESS_CHECK(joinColScan == 0);
auto permutationIdTable = ad_utility::IdTableAndFirstCol{
Expand All @@ -819,13 +790,13 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
!idTable.empty() && idTable.at(0, joinColTable).isUndefined();
std::optional<std::shared_ptr<const Result>> indexScanResult =
std::nullopt;
using FirstColView = ad_utility::IdTableAndFirstCol<IdTable>;
using GenWithDetails =
cppcoro::generator<FirstColView,
CompressedRelationReader::LazyScanMetadata>;
using GenWithDetails = decltype(convertGenerator(
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
std::declval<Permutation::IdTableGenerator>()));
auto rightBlocks = [&scan, idTableHasUndef, &permutationIdTable,
&indexScanResult]()
-> std::variant<cppcoro::generator<FirstColView>, GenWithDetails> {
-> std::variant<decltype(convertGenerator(
std::declval<Result::Generator>())),
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
GenWithDetails> {
if (idTableHasUndef) {
indexScanResult =
scan->getResult(false, ComputationMode::LAZY_IF_SUPPORTED);
Expand All @@ -841,15 +812,13 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
}();

runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs());
auto doJoin = [&rowAdder, idTableHasUndef](auto& left,
auto& right) mutable {
if (idTableHasUndef) {
ad_utility::zipperJoinForBlocksWithPotentialUndef(
left, right, std::less{}, rowAdder);
} else {
ad_utility::zipperJoinForBlocksWithoutUndef(left, right,
std::less{}, rowAdder);
}
auto doJoin = [&rowAdder](auto& left, auto& right) mutable {
// Technically we could use the zipperJoinForBlocksWithoutUndef when
// checking for `idTableHasUndef`, but the implementation of the join
// algorithm consumes all undef blocks at the start and falls back to
// the other implementation when no undef blocks could be found.
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
ad_utility::zipperJoinForBlocksWithPotentialUndef(
left, right, std::less{}, rowAdder);
};
auto blockForIdTable = std::array{std::move(permutationIdTable)};
std::visit(
Expand All @@ -873,3 +842,21 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
},
std::move(resultPermutation));
}
// _____________________________________________________________________________
ProtoResult Join::computeResultForTwoMaterializedInputs(
std::shared_ptr<const Result> leftRes,
std::shared_ptr<const Result> rightRes) const {
IdTable idTable{getResultWidth(), allocator()};
join(leftRes->idTable(), _leftJoinCol, rightRes->idTable(), _rightJoinCol,
&idTable);
checkCancellation();

return {std::move(idTable), resultSortedOn(),
Result::getMergedLocalVocab(*leftRes, *rightRes)};
}

// _____________________________________________________________________________
ProtoResult Join::createEmptyResult() const {
return {IdTable{getResultWidth(), allocator()}, resultSortedOn(),
LocalVocab{}};
}
Loading
Loading