Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy join #1524

Merged
merged 34 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f1a67d4
Implement lazy join
RobinTF Nov 18, 2024
711c3a7
Properly reformat
RobinTF Nov 18, 2024
08be2d8
Fix race condition
RobinTF Nov 18, 2024
c9a5648
Fix lifetime issues and add documentation
RobinTF Nov 18, 2024
bc659c6
Properly reset runtime parameter after test
RobinTF Nov 19, 2024
112a757
Properly use `LocalVocab`s from fully materialized results
RobinTF Nov 19, 2024
d091bba
Properly apply permutation
RobinTF Nov 19, 2024
f585987
Spawn thread only if generator is consumed
RobinTF Nov 20, 2024
8ecc821
Fix issue with join aggregation
RobinTF Nov 20, 2024
190b8de
Fix typo
RobinTF Nov 20, 2024
3bbc466
Unify two generators
RobinTF Nov 20, 2024
3e6561c
Fix deadlock
RobinTF Nov 20, 2024
ec02b9f
Use robuster approach for thread safety
RobinTF Nov 20, 2024
cbc6252
Add test for operation failure propagation
RobinTF Nov 20, 2024
f56a00b
Merge remote-tracking branch 'origin/master' into lazy-join
Nov 20, 2024
1f54327
Show num-local-vocabs in runtime information
Nov 20, 2024
99ea8ee
Merge branch 'lazy-join' of github.com:RobinTF/qlever into lazy-join
Nov 20, 2024
1d5dcda
Fix problem from previous conflict resolution
Nov 20, 2024
b2b944f
Add unit test to ensure correct permutations
RobinTF Nov 20, 2024
1b55f45
Avoid out-of-line definition
RobinTF Nov 20, 2024
b9b7cae
Address PR comments
RobinTF Nov 21, 2024
0f4f859
Address simple PR comments
RobinTF Nov 22, 2024
62d9295
Allow more ranges to be used in `LocalVocab`
RobinTF Nov 22, 2024
c7889a6
Merge missing vocabs
RobinTF Nov 22, 2024
f1c886a
Merge remote-tracking branch 'ad-freiburg/master' into lazy-join
RobinTF Nov 22, 2024
9e357cd
Revert "Allow more ranges to be used in `LocalVocab`"
RobinTF Nov 22, 2024
711212a
Fix compilation
RobinTF Nov 22, 2024
4b51d1d
Address minor PR comments
RobinTF Nov 24, 2024
8e85cc7
Fix issues with vocab and add unit tests
RobinTF Nov 24, 2024
a8e3240
Add another unit test
RobinTF Nov 24, 2024
3f817fa
Fix typo
RobinTF Nov 24, 2024
ad0128a
Replace else case with assertion
RobinTF Nov 24, 2024
f88d4b8
Check empty table instead of vocab
RobinTF Nov 25, 2024
66cb836
Fix unit test
RobinTF Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AddCombinedRowToIdTable {
std::optional<std::array<IdTableView<0>, 2>> inputLeftAndRight_;
IdTable resultTable_;
LocalVocab mergedVocab_{};
std::array<const LocalVocab*, 2> currentVocabs_{nullptr, nullptr};

// This struct stores the information, which row indices from the input are
// combined into a given row index in the output, i.e. "To obtain the
Expand Down Expand Up @@ -139,17 +140,21 @@ class AddCombinedRowToIdTable {
return table;
}
};
auto mergeVocab = [this]<typename T>(const T& table) {
auto mergeVocab = [this]<typename T>(const T& table,
const LocalVocab*& currentVocab) {
if constexpr (requires { table.getLocalVocab(); }) {
currentVocab = &table.getLocalVocab();
mergedVocab_.mergeWith(std::span{&table.getLocalVocab(), 1});
} else {
currentVocab = nullptr;
}
};
if (nextIndex_ != 0) {
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
flush();
}
mergeVocab(inputLeft);
mergeVocab(inputRight);
mergeVocab(inputLeft, currentVocabs_.at(0));
mergeVocab(inputRight, currentVocabs_.at(1));
inputLeftAndRight_ = std::array{toView(inputLeft), toView(inputRight)};
checkNumColumns();
}
Expand Down Expand Up @@ -331,6 +336,14 @@ class AddCombinedRowToIdTable {
optionalIndexBuffer_.clear();
nextIndex_ = 0;
std::invoke(blockwiseCallback_, result, mergedVocab_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely have to talk about the LocalVocab stuff, as this really aggressively merges the local vocabs over and over again in the following cases:

  • The undef blocks are set over and over again
  • We have lazy inputs, but a fully materialized result [The most common case]
  • We have many cartesian blocks (but there we typically have much more input rows than input vocabularies).

I see two potential angles here:

  1. Deduplicate inside the LocalVocab class (the most aggressive way: Use a HashMap instead of a vector for the otherWordSets.
  2. Fiddle with the internals of the join here (hard to do, because we have to further hack the ZipperJoinWithBlocksAndUndef to be aware of the result being fully materialized etc. That's why I am a little bit against it.
    I will ask @hannahbast what she thinks about 1. It will only have a performance impact for inputs with very many nonempty local vocabs, and these are typically slower anyway, so it seems feasible to me.

RobinTF marked this conversation as resolved.
Show resolved Hide resolved
// The current `IdTable`s might still be active, so we have to merge the
// local vocabs again if all other sets were moved-out.
if (mergedVocab_.numSets() == 1) {
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
// Only merge non-null vocabs.
auto range = currentVocabs_ | std::views::filter(toBool) |
std::views::transform(dereference);
mergedVocab_.mergeWith(std::ranges::ref_view{range});
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
}
}
const IdTableView<0>& inputLeft() const {
return inputLeftAndRight_.value()[0];
Expand Down
85 changes: 45 additions & 40 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,49 @@ using std::endl;
using std::string;

namespace {
void applyPermutation(
IdTable& idTable,
const std::optional<std::vector<ColumnIndex>>& permutation) {
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
if (permutation.has_value()) {
idTable.setColumnSubset(permutation.value());
}
}

using LazyInputView =
cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>>;
// Convert a `generator<IdTableVocab>` to a `generator<IdTableAndFirstCol>` for
// more efficient access in the join columns below and apply the given
// permutation to each table.
cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>> convertGenerator(
Result::Generator gen, std::vector<ColumnIndex> permutation = {}) {
LazyInputView convertGenerator(
Result::Generator gen,
std::optional<std::vector<ColumnIndex>> permutation = {}) {
for (auto& [table, localVocab] : gen) {
applyPermutation(table, permutation);
// Make sure to actually move the table into the wrapper so that the tables
// live as long as the wrapper.
if (!permutation.empty()) {
table.setColumnSubset(permutation);
}
ad_utility::IdTableAndFirstCol t{std::move(table), std::move(localVocab)};
co_yield t;
}
}
joka921 marked this conversation as resolved.
Show resolved Hide resolved

// Wrap a fully materialized result in a `IdTableAndFirstCol` and an array.
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1> asSingleTableView(
using MaterializedInputView =
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1>;
// Wrap a fully materialized result in a `IdTableAndFirstCol` and an array. It
// then fulfills the concept `view<IdTableAndFirstCol>` which is required by the
// lazy join algorithms. Note: The `convertGenerator` function above
// conceptually does exactly the same for lazy inputs.
MaterializedInputView asSingleTableView(
const Result& result, const std::vector<ColumnIndex>& permutation) {
return std::array{ad_utility::IdTableAndFirstCol{
result.idTable().asColumnSubsetView(permutation),
result.getCopyOfLocalVocab()}};
}

// Wrap a result either in an array with a single element or in a range wrapping
// the lazy result generator.
std::variant<decltype(convertGenerator(std::declval<Result::Generator>())),
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1>>
resultToRange(const Result& result,
const std::vector<ColumnIndex>& permutation) {
// the lazy result generator. Note that the lifetime of the view is coupled to
// the lifetime of the result.
std::variant<LazyInputView, MaterializedInputView> resultToView(
const Result& result, const std::vector<ColumnIndex>& permutation) {
if (result.isFullyMaterialized()) {
return asSingleTableView(result, permutation);
}
Expand Down Expand Up @@ -418,11 +431,11 @@ void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b,
}

// _____________________________________________________________________________
Result::Generator Join::yieldOnCallbackCalled(
Result::Generator Join::runLazyJoinAndConvertToGenerator(
ad_utility::InvocableWithExactReturnType<
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const {
std::optional<std::vector<ColumnIndex>> permutation) const {
std::atomic_flag write = true;
std::variant<std::monostate, Result::IdTableVocabPair, std::exception_ptr>
storage;
Expand All @@ -435,9 +448,7 @@ Result::Generator Join::yieldOnCallbackCalled(
auto writeValueAndWait = [&permutation, &write,
&writeValue](Result::IdTableVocabPair value) {
AD_CORRECTNESS_CHECK(write.test());
if (!permutation.empty()) {
value.idTable_.setColumnSubset(permutation);
}
applyPermutation(value.idTable_, permutation);
writeValue(std::move(value));
// Wait until we are allowed to write again.
write.wait(false);
Expand All @@ -447,8 +458,7 @@ Result::Generator Join::yieldOnCallbackCalled(
if (idTable.size() < CHUNK_SIZE) {
return;
}
writeValueAndWait(
Result::IdTableVocabPair{std::move(idTable), std::move(localVocab)});
writeValueAndWait({std::move(idTable), std::move(localVocab)});
};
try {
auto finalValue = action(addValue);
Expand Down Expand Up @@ -482,15 +492,14 @@ ProtoResult Join::createResult(
ad_utility::InvocableWithExactReturnType<
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const {
std::optional<std::vector<ColumnIndex>> permutation) const {
if (requestedLaziness) {
return {yieldOnCallbackCalled(std::move(action), std::move(permutation)),
return {runLazyJoinAndConvertToGenerator(std::move(action),
std::move(permutation)),
resultSortedOn()};
} else {
auto [idTable, localVocab] = action(ad_utility::noop);
if (!permutation.empty()) {
idTable.setColumnSubset(permutation);
}
applyPermutation(idTable, permutation);
return {std::move(idTable), resultSortedOn(), std::move(localVocab)};
}
}
Expand All @@ -515,8 +524,8 @@ ProtoResult Join::lazyJoin(std::shared_ptr<const Result> a, ColumnIndex jc1,
ad_utility::AddCombinedRowToIdTable rowAdder{
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(yieldTable)};
auto leftRange = resultToRange(*a, joinColMap.permutationLeft());
auto rightRange = resultToRange(*b, joinColMap.permutationRight());
auto leftRange = resultToView(*a, joinColMap.permutationLeft());
auto rightRange = resultToView(*b, joinColMap.permutationRight());
std::visit(
[&rowAdder](auto& leftBlocks, auto& rightBlocks) {
ad_utility::zipperJoinForBlocksWithPotentialUndef(
Expand Down Expand Up @@ -662,11 +671,12 @@ void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB,
}

namespace {
using GeneratorWithDetails =
cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>,
CompressedRelationReader::LazyScanMetadata>;
// Convert a `generator<IdTable` to a `generator<IdTableAndFirstCol>` for more
// efficient access in the join columns below.
cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>,
CompressedRelationReader::LazyScanMetadata>
convertGenerator(Permutation::IdTableGenerator gen) {
GeneratorWithDetails convertGenerator(Permutation::IdTableGenerator gen) {
co_await cppcoro::getDetails = gen.details();
gen.setDetailsPointer(&co_await cppcoro::getDetails);
for (auto& table : gen) {
Expand Down Expand Up @@ -790,13 +800,9 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
!idTable.empty() && idTable.at(0, joinColTable).isUndefined();
std::optional<std::shared_ptr<const Result>> indexScanResult =
std::nullopt;
using GenWithDetails = decltype(convertGenerator(
std::declval<Permutation::IdTableGenerator>()));
auto rightBlocks = [&scan, idTableHasUndef, &permutationIdTable,
&indexScanResult]()
-> std::variant<decltype(convertGenerator(
std::declval<Result::Generator>())),
GenWithDetails> {
-> std::variant<LazyInputView, GeneratorWithDetails> {
if (idTableHasUndef) {
indexScanResult =
scan->getResult(false, ComputationMode::LAZY_IF_SUPPORTED);
Expand All @@ -813,10 +819,9 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(

runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs());
auto doJoin = [&rowAdder](auto& left, auto& right) mutable {
// Technically we could use the zipperJoinForBlocksWithoutUndef when
// checking for `idTableHasUndef`, but the implementation of the join
// algorithm consumes all undef blocks at the start and falls back to
// the other implementation when no undef blocks could be found.
// Note: The `zipperJoinForBlocksWithPotentialUndef` automatically
// switches to a more efficient implementation if there are no UNDEF
// values in any of the inputs.
ad_utility::zipperJoinForBlocksWithPotentialUndef(
left, right, std::less{}, rowAdder);
};
Expand All @@ -831,9 +836,9 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
},
rightBlocks);

if (std::holds_alternative<GenWithDetails>(rightBlocks)) {
if (std::holds_alternative<GeneratorWithDetails>(rightBlocks)) {
updateRuntimeInfoForLazyScan(
*scan, std::get<GenWithDetails>(rightBlocks).details());
*scan, std::get<GeneratorWithDetails>(rightBlocks).details());
}

auto localVocab = std::move(rowAdder.localVocab());
Expand Down
13 changes: 8 additions & 5 deletions src/engine/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,16 @@ class Join : public Operation {
private:
// Part of the implementation of `createResult`. This function is called when
// the result should be yielded lazily.
// The semantics of action can be seen as
// runJoinAndReturnFinalResult(callbackForIntermediateResults).
Result::Generator yieldOnCallbackCalled(
// Action is a lambda that itself runs the join operation in a blocking
// manner. It is passed a special function that is supposed to be the callback
// being passed to the `AddCombinedRowToIdTable` so that the partial results
// can be yielded during execution. This is achieved by spawning a separate
// thread.
Result::Generator runLazyJoinAndConvertToGenerator(
ad_utility::InvocableWithExactReturnType<
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const;
std::optional<std::vector<ColumnIndex>> permutation) const;

public:
// Helper function to compute the result of a join operation and conditionally
Expand All @@ -122,7 +125,7 @@ class Join : public Operation {
ad_utility::InvocableWithExactReturnType<
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation = {}) const;
std::optional<std::vector<ColumnIndex>> permutation = {}) const;

// Fallback implementation of a join that is used when at least one of the two
// inputs is not fully materialized. This represents the general case where we
Expand Down
Loading
Loading