Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy join #1524

Merged
merged 34 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f1a67d4
Implement lazy join
RobinTF Nov 18, 2024
711c3a7
Properly reformat
RobinTF Nov 18, 2024
08be2d8
Fix race condition
RobinTF Nov 18, 2024
c9a5648
Fix lifetime issues and add documentation
RobinTF Nov 18, 2024
bc659c6
Properly reset runtime parameter after test
RobinTF Nov 19, 2024
112a757
Properly use `LocalVocab`s from fully materialized results
RobinTF Nov 19, 2024
d091bba
Properly apply permutation
RobinTF Nov 19, 2024
f585987
Spawn thread only if generator is consumed
RobinTF Nov 20, 2024
8ecc821
Fix issue with join aggregation
RobinTF Nov 20, 2024
190b8de
Fix typo
RobinTF Nov 20, 2024
3bbc466
Unify two generators
RobinTF Nov 20, 2024
3e6561c
Fix deadlock
RobinTF Nov 20, 2024
ec02b9f
Use robuster approach for thread safety
RobinTF Nov 20, 2024
cbc6252
Add test for operation failure propagation
RobinTF Nov 20, 2024
f56a00b
Merge remote-tracking branch 'origin/master' into lazy-join
Nov 20, 2024
1f54327
Show num-local-vocabs in runtime information
Nov 20, 2024
99ea8ee
Merge branch 'lazy-join' of github.com:RobinTF/qlever into lazy-join
Nov 20, 2024
1d5dcda
Fix problem from previous conflict resolution
Nov 20, 2024
b2b944f
Add unit test to ensure correct permutations
RobinTF Nov 20, 2024
1b55f45
Avoid out-of-line definition
RobinTF Nov 20, 2024
b9b7cae
Address PR comments
RobinTF Nov 21, 2024
0f4f859
Address simple PR comments
RobinTF Nov 22, 2024
62d9295
Allow more ranges to be used in `LocalVocab`
RobinTF Nov 22, 2024
c7889a6
Merge missing vocabs
RobinTF Nov 22, 2024
f1c886a
Merge remote-tracking branch 'ad-freiburg/master' into lazy-join
RobinTF Nov 22, 2024
9e357cd
Revert "Allow more ranges to be used in `LocalVocab`"
RobinTF Nov 22, 2024
711212a
Fix compilation
RobinTF Nov 22, 2024
4b51d1d
Address minor PR comments
RobinTF Nov 24, 2024
8e85cc7
Fix issues with vocab and add unit tests
RobinTF Nov 24, 2024
a8e3240
Add another unit test
RobinTF Nov 24, 2024
3f817fa
Fix typo
RobinTF Nov 24, 2024
ad0128a
Replace else case with assertion
RobinTF Nov 24, 2024
f88d4b8
Check empty table instead of vocab
RobinTF Nov 25, 2024
66cb836
Fix unit test
RobinTF Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class AddCombinedRowToIdTable {
size_t numJoinColumns_;
std::optional<std::array<IdTableView<0>, 2>> inputLeftAndRight_;
IdTable resultTable_;
LocalVocab mergedVocab_{};

// This struct stores the information, which row indices from the input are
// combined into a given row index in the output, i.e. "To obtain the
Expand Down Expand Up @@ -62,7 +63,7 @@ class AddCombinedRowToIdTable {
// This callback is called with the result as an argument each time `flush()`
// is called. It can be used to consume parts of the result early, before the
// complete operation has finished.
using BlockwiseCallback = std::function<void(IdTable&)>;
using BlockwiseCallback = std::function<void(IdTable&, LocalVocab&)>;
[[no_unique_address]] BlockwiseCallback blockwiseCallback_{ad_utility::noop};

using CancellationHandle = ad_utility::SharedCancellationHandle;
Expand Down Expand Up @@ -138,10 +139,17 @@ class AddCombinedRowToIdTable {
return table;
}
};
auto mergeVocab = [this]<typename T>(const T& table) {
if constexpr (requires { table.getLocalVocab(); }) {
mergedVocab_.mergeWith(std::span{&table.getLocalVocab(), 1});
}
};
if (nextIndex_ != 0) {
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
flush();
}
mergeVocab(inputLeft);
mergeVocab(inputRight);
inputLeftAndRight_ = std::array{toView(inputLeft), toView(inputRight)};
checkNumColumns();
}
Expand Down Expand Up @@ -188,6 +196,8 @@ class AddCombinedRowToIdTable {
return std::move(resultTable_);
}

LocalVocab& localVocab() { return mergedVocab_; }

// Disable copying and moving, it is currently not needed and makes it harder
// to reason about
AddCombinedRowToIdTable(const AddCombinedRowToIdTable&) = delete;
Expand Down Expand Up @@ -320,7 +330,7 @@ class AddCombinedRowToIdTable {
indexBuffer_.clear();
optionalIndexBuffer_.clear();
nextIndex_ = 0;
std::invoke(blockwiseCallback_, result);
std::invoke(blockwiseCallback_, result, mergedVocab_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely have to talk about the LocalVocab stuff, as this really aggressively merges the local vocabs over and over again in the following cases:

  • The undef blocks are set over and over again
  • We have lazy inputs, but a fully materialized result [The most common case]
  • We have many cartesian blocks (but there we typically have much more input rows than input vocabularies).

I see two potential angles here:

  1. Deduplicate inside the LocalVocab class (the most aggressive way: Use a HashMap instead of a vector for the otherWordSets.
  2. Fiddle with the internals of the join here (hard to do, because we have to further hack the ZipperJoinWithBlocksAndUndef to be aware of the result being fully materialized etc. That's why I am a little bit against it.
    I will ask @hannahbast what she thinks about 1. It will only have a performance impact for inputs with very many nonempty local vocabs, and these are typically slower anyway, so it seems feasible to me.

RobinTF marked this conversation as resolved.
Show resolved Hide resolved
}
const IdTableView<0>& inputLeft() const {
return inputLeftAndRight_.value()[0];
Expand Down
461 changes: 333 additions & 128 deletions src/engine/Join.cpp

Large diffs are not rendered by default.

47 changes: 40 additions & 7 deletions src/engine/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "engine/QueryExecutionTree.h"
#include "util/HashMap.h"
#include "util/HashSet.h"
#include "util/JoinAlgorithms/JoinAlgorithms.h"
#include "util/TypeTraits.h"

class Join : public Operation {
private:
Expand All @@ -33,6 +33,7 @@ class Join : public Operation {
std::shared_ptr<QueryExecutionTree> t2, ColumnIndex t1JoinCol,
ColumnIndex t2JoinCol);

static constexpr size_t CHUNK_SIZE = 100'000;
// A very explicit constructor, which initializes an invalid join object (it
// has no subtrees, which violates class invariants). These invalid Join
// objects can be used for unit tests that only test member functions which
Expand Down Expand Up @@ -93,6 +94,38 @@ class Join : public Operation {
void join(const IdTable& a, ColumnIndex jc1, const IdTable& b,
ColumnIndex jc2, IdTable* result) const;

// Helper function to compute the result of a join operation and conditionally
// return a lazy or fully materialized result depending on `requestLaziness`.
// This is achieved by running the `action` lambda in a separate thread and
// returning a lazy result that reads from the queue of the thread. If
// `requestLaziness` is false, the result is fully materialized and returned
// directly.
// `permutation` indicates a permutation to apply to the result columns before
// yielding/returning them. An empty vector means no permutation is applied.
// `action` is a lambda that can be used to send partial chunks to a consumer
// in addition to returning the remaining result. If laziness is not required
// it is a no-op.
ProtoResult createResult(
bool requestedLaziness,
ad_utility::InvocableWithExactReturnType<
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation = {}) const;

// Helper function that cheaply checks if a join could contain undefined. For
// fully materialized tables it can just look at the first element. For lazy
// tables it has to look at the meta information which could potentially
// indicate undefinedness even when all values are defined.
static bool couldContainUndef(const auto& blocks, const auto& tree,
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
ColumnIndex joinColumn);

// Fallback implementation of a join that is used when at least one of the two
// inputs is not fully materialized. This represents the general case where we
// don't have any optimization left to try.
ProtoResult lazyJoin(std::shared_ptr<const Result> a, ColumnIndex jc1,
std::shared_ptr<const Result> b, ColumnIndex jc2,
bool requestLaziness) const;

/**
* @brief Joins IdTables dynA and dynB on join column jc2, returning
* the result in dynRes. Creates a cross product for matching rows by putting
Expand All @@ -113,24 +146,24 @@ class Join : public Operation {
virtual string getCacheKeyImpl() const override;

private:
ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override;
ProtoResult computeResult(bool requestLaziness) override;

VariableToColumnMap computeVariableToColumnMap() const override;

// A special implementation that is called when both children are
// `IndexScan`s. Uses the lazy scans to only retrieve the subset of the
// `IndexScan`s that is actually needed without fully materializing them.
IdTable computeResultForTwoIndexScans();
ProtoResult computeResultForTwoIndexScans(bool requestLaziness) const;

// A special implementation that is called when one of the children is an
// `IndexScan`. The argument `scanIsLeft` determines whether the `IndexScan`
// is the left or the right child of this `Join`. This needs to be known to
// determine the correct order of the columns in the result.
template <bool scanIsLeft>
IdTable computeResultForIndexScanAndIdTable(const IdTable& idTable,
ColumnIndex joinColTable,
IndexScan& scan,
ColumnIndex joinColScan);
ProtoResult computeResultForIndexScanAndIdTable(
bool requestLaziness, std::shared_ptr<const Result> resultWithIdTable,
ColumnIndex joinColTable, std::shared_ptr<IndexScan> scan,
ColumnIndex joinColScan) const;

/*
* @brief Combines 2 rows like in a join and inserts the result in the
Expand Down
3 changes: 3 additions & 0 deletions src/engine/LocalVocab.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class LocalVocab {
// Return true if and only if the local vocabulary is empty.
bool empty() const { return size() == 0; }

// The number of set stores (primary set and other sets).
size_t numSets() const { return 1 + otherWordSets_.size(); }

// Get the `LocalVocabEntry` corresponding to the given `LocalVocabIndex`.
//
// NOTE: This used to be a more complex function but is now a simple
Expand Down
4 changes: 4 additions & 0 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ ProtoResult Operation::runComputation(const ad_utility::Timer& timer,
// correctly because the result was computed, so we can pass `nullopt` as
// the last argument.
if (result.isFullyMaterialized()) {
size_t numLocalVocabs = result.localVocab().numSets();
if (numLocalVocabs > 1) {
runtimeInfo().addDetail("num-local-vocabs", numLocalVocabs);
}
updateRuntimeInformationOnSuccess(result.idTable().size(),
ad_utility::CacheStatus::computed,
timer.msecs(), std::nullopt);
Expand Down
36 changes: 18 additions & 18 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,24 +192,24 @@ IndexImpl::buildOspWithPatterns(
// them to the queue.
IdTable outputBufferTable{NumColumnsIndexBuilding + 2,
ad_utility::makeUnlimitedAllocator<Id>()};
auto pushToQueue =
[&, bufferSize =
BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP.load()](IdTable& table) {
if (table.numRows() >= bufferSize) {
if (!outputBufferTable.empty()) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
queue.push(std::move(table));
} else {
outputBufferTable.insertAtEnd(table.begin(), table.end());
if (outputBufferTable.size() >= bufferSize) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
}
table.clear();
};
auto pushToQueue = [&, bufferSize =
BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP.load()](
IdTable& table, LocalVocab&) {
if (table.numRows() >= bufferSize) {
if (!outputBufferTable.empty()) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
queue.push(std::move(table));
} else {
outputBufferTable.insertAtEnd(table.begin(), table.end());
if (outputBufferTable.size() >= bufferSize) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
}
table.clear();
};

lazyOptionalJoinOnFirstColumn(ospAsBlocksTransformed, lazyPatternScan,
pushToQueue);
Expand Down
9 changes: 8 additions & 1 deletion src/util/JoinAlgorithms/JoinColumnMapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstdint>
#include <vector>

#include "engine/LocalVocab.h"
#include "util/Algorithm.h"

namespace ad_utility {
Expand Down Expand Up @@ -108,6 +109,7 @@ template <typename Table>
struct IdTableAndFirstCol {
private:
Table table_;
LocalVocab localVocab_;

public:
// Typedef needed for generic interfaces.
Expand All @@ -116,7 +118,8 @@ struct IdTableAndFirstCol {
std::decay_t<decltype(std::as_const(table_).getColumn(0).begin())>;

// Construct by taking ownership of the table.
explicit IdTableAndFirstCol(Table t) : table_{std::move(t)} {}
IdTableAndFirstCol(Table t, LocalVocab localVocab)
: table_{std::move(t)}, localVocab_{std::move(localVocab)} {}

// Get access to the first column.
decltype(auto) col() { return table_.getColumn(0); }
Expand All @@ -131,6 +134,8 @@ struct IdTableAndFirstCol {
bool empty() const { return col().empty(); }

const Id& operator[](size_t idx) const { return col()[idx]; }
const Id& front() const { return col().front(); }
const Id& back() const { return col().back(); }

size_t size() const { return col().size(); }

Expand All @@ -141,5 +146,7 @@ struct IdTableAndFirstCol {
IdTableView<I> asStaticView() const {
return table_.template asStaticView<I>();
}

const LocalVocab& getLocalVocab() const { return localVocab_; }
};
} // namespace ad_utility
Loading
Loading