Skip to content

Commit

Permalink
Also make joins between two index scans individually observable.
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Kalmbach <[email protected]>
  • Loading branch information
joka921 committed Dec 5, 2024
1 parent ec806f0 commit 8a362a0
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 56 deletions.
78 changes: 75 additions & 3 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,25 @@ std::shared_ptr<QueryExecutionTree> IndexScan::makeCopyWithAddedPrefilters(
}

// _____________________________________________________________________________
Result::Generator IndexScan::chunkedIndexScan() const {
Result::Generator IndexScan::chunkedIndexScan() {
auto optBlockSpan = getBlockMetadata();
if (!optBlockSpan.has_value()) {
co_return;
}
const auto& blockSpan = optBlockSpan.value();
size_t numBlocksAll = blockSpan.size();
// Note: Given a `PrefilterIndexPair` is available, the corresponding
// prefiltering will be applied in `getLazyScan`.
for (IdTable& idTable : getLazyScan({blockSpan.begin(), blockSpan.end()})) {
auto innerGenerator = getLazyScan({blockSpan.begin(), blockSpan.end()});
auto setDetails = ad_utility::makeOnDestructionDontThrowDuringStackUnwinding(
[

this, numBlocksAll, &innerGenerator]() {
auto details = innerGenerator.details();
details.numBlocksAll_ = numBlocksAll;
updateRuntimeInfoForLazyScan(details);
});
for (IdTable& idTable : innerGenerator) {
co_yield {std::move(idTable), LocalVocab{}};
}
}
Expand Down Expand Up @@ -339,7 +349,14 @@ IndexScan::getBlockMetadata() const {
// _____________________________________________________________________________
std::optional<std::vector<CompressedBlockMetadata>>
IndexScan::getBlockMetadataOptionallyPrefiltered() const {
auto optBlockSpan = getBlockMetadata();
auto optBlockSpan =
[&]() -> std::optional<std::span<const CompressedBlockMetadata>> {
if (prefilteredBlocks_.has_value()) {
return prefilteredBlocks_.value();
} else {
return getBlockMetadata();
}
}();
std::optional<std::vector<CompressedBlockMetadata>> optBlocks = std::nullopt;
if (optBlockSpan.has_value()) {
const auto& blockSpan = optBlockSpan.value();
Expand Down Expand Up @@ -389,6 +406,7 @@ std::optional<Permutation::MetadataAndBlocks> IndexScan::getMetadataForScan()
};

// _____________________________________________________________________________
// TODO<joka921> This can be removed now.
std::array<Permutation::IdTableGenerator, 2>
IndexScan::lazyScanForJoinOfTwoScans(const IndexScan& s1, const IndexScan& s2) {
AD_CONTRACT_CHECK(s1.numVariables_ <= 3 && s2.numVariables_ <= 3);
Expand Down Expand Up @@ -656,3 +674,57 @@ std::pair<Result::Generator, Result::Generator> IndexScan::prefilterTables(
return {createPrefilteredJoinSide(state),
createPrefilteredIndexScanSide(state)};
}

// _____________________________________________________________________________
void IndexScan::setBlocksForJoinOfIndexScans(Operation* left,
Operation* right) {
auto& leftScan = dynamic_cast<IndexScan&>(*left);
auto& rightScan = dynamic_cast<IndexScan&>(*right);

auto getBlocks = [](IndexScan& scan) {
auto metaBlocks = scan.getMetadataForScan();
if (!metaBlocks.has_value()) {
return metaBlocks;
}

Check warning on line 688 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L687-L688

Added lines #L687 - L688 were not covered by tests
if (scan.prefilteredBlocks_.has_value()) {
metaBlocks.value().blockMetadata_ = scan.prefilteredBlocks_.value();
}
return metaBlocks;
};

auto metaBlocks1 = getBlocks(leftScan);
auto metaBlocks2 = getBlocks(rightScan);
if (!metaBlocks1.has_value() || !metaBlocks2.has_value()) {
return;
}

Check warning on line 699 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L698-L699

Added lines #L698 - L699 were not covered by tests
auto [blocks1, blocks2] = CompressedRelationReader::getBlocksForJoin(
metaBlocks1.value(), metaBlocks2.value());
leftScan.prefilteredBlocks_ = std::move(blocks1);
rightScan.prefilteredBlocks_ = std::move(blocks2);
}

// _____________________________________________________________________________
std::vector<Operation*> IndexScan::getIndexScansForSortVariables(
std::span<const Variable> variables) {
const auto& sorted = resultSortedOn();
if (resultSortedOn().size() < variables.size()) {
return {};
}

Check warning on line 712 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L711-L712

Added lines #L711 - L712 were not covered by tests
const auto& varColMap = getExternallyVisibleVariableColumns();
for (size_t i = 0; i < variables.size(); ++i) {
auto it = varColMap.find(variables[i]);
if (it == varColMap.end() ||
it->second.columnIndex_ != resultSortedOn().at(i)) {
return {};
}

Check warning on line 719 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L718-L719

Added lines #L718 - L719 were not covered by tests
}
return {this};
}

// _____________________________________________________________________________
void IndexScan::setPrefilteredBlocks(
std::vector<CompressedBlockMetadata> prefilteredBlocks) {
prefilteredBlocks_ = std::move(prefilteredBlocks);

Check warning on line 727 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L726-L727

Added lines #L726 - L727 were not covered by tests
// TODO<joka921> once the other PR is merged we have to assert that the result
// is never cached AD_CORRECTNESS_CHECK(!canBeStoredInCache());
}

Check warning on line 730 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L730

Added line #L730 was not covered by tests
15 changes: 14 additions & 1 deletion src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class IndexScan final : public Operation {
std::vector<ColumnIndex> additionalColumns_;
std::vector<Variable> additionalVariables_;

// TODO<joka921> Comment
std::optional<std::vector<CompressedBlockMetadata>> prefilteredBlocks_;

public:
IndexScan(QueryExecutionContext* qec, Permutation::Enum permutation,
const SparqlTriple& triple, Graphs graphsToFilter = std::nullopt,
Expand Down Expand Up @@ -108,6 +111,9 @@ class IndexScan final : public Operation {
std::pair<Result::Generator, Result::Generator> prefilterTables(
Result::Generator input, ColumnIndex joinColumn);

// TODO<joka921> Comment
static void setBlocksForJoinOfIndexScans(Operation* left, Operation* right);

private:
// Implementation detail that allows to consume a generator from two other
// cooperating generators. Needs to be forward declared as it is used by
Expand Down Expand Up @@ -202,7 +208,7 @@ class IndexScan final : public Operation {
PrefilterIndexPair prefilter) const;

// Return the (lazy) `IdTable` for this `IndexScan` in chunks.
Result::Generator chunkedIndexScan() const;
Result::Generator chunkedIndexScan();
// Get the `IdTable` for this `IndexScan` in one piece.
IdTable materializedIndexScan() const;

Expand Down Expand Up @@ -234,4 +240,11 @@ class IndexScan final : public Operation {
Permutation::IdTableGenerator getLazyScan(
std::vector<CompressedBlockMetadata> blocks) const;
std::optional<Permutation::MetadataAndBlocks> getMetadataForScan() const;

// TODO<joka921> Comment.
void setPrefilteredBlocks(
std::vector<CompressedBlockMetadata> prefilteredBlocks);

std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) override;
};
67 changes: 21 additions & 46 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ ProtoResult Join::computeResult(bool requestLaziness) {
auto rightResIfCached = getCachedOrSmallResult(*_right);
checkCancellation();

// TODO<joka921> Copy and move to separate function.
std::span joinVarSpan{&_joinVar, 1};
auto leftIndexScans = _left->getIndexScansForSortVariables(joinVarSpan);
auto rightIndexScans = _right->getIndexScansForSortVariables(joinVarSpan);
for (auto* left : leftIndexScans) {
for (auto* right : rightIndexScans) {
IndexScan::setBlocksForJoinOfIndexScans(left, right);
}
}

auto leftIndexScan =
std::dynamic_pointer_cast<IndexScan>(_left->getRootOperation());
if (leftIndexScan &&
Expand All @@ -189,9 +199,6 @@ ProtoResult Join::computeResult(bool requestLaziness) {
AD_CORRECTNESS_CHECK(rightResIfCached->isFullyMaterialized());
return computeResultForIndexScanAndIdTable<true>(
requestLaziness, std::move(rightResIfCached), leftIndexScan);

} else if (!leftResIfCached) {
return computeResultForTwoIndexScans(requestLaziness);
}
}

Expand All @@ -216,9 +223,10 @@ ProtoResult Join::computeResult(bool requestLaziness) {
if (leftRes->isFullyMaterialized()) {
return computeResultForIndexScanAndIdTable<false>(
requestLaziness, std::move(leftRes), rightIndexScan);
} else if (!leftIndexScan) {
return computeResultForIndexScanAndLazyOperation(
requestLaziness, std::move(leftRes), rightIndexScan);
}
return computeResultForIndexScanAndLazyOperation(
requestLaziness, std::move(leftRes), rightIndexScan);
}

std::shared_ptr<const Result> rightRes =
Expand Down Expand Up @@ -647,47 +655,6 @@ void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB,
}
}

// ______________________________________________________________________________________________________
ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const {
return createResult(
requestLaziness,
[this](std::function<void(IdTable&, LocalVocab&)> yieldTable) {
auto leftScan =
std::dynamic_pointer_cast<IndexScan>(_left->getRootOperation());
auto rightScan =
std::dynamic_pointer_cast<IndexScan>(_right->getRootOperation());
AD_CORRECTNESS_CHECK(leftScan && rightScan);
// The join column already is the first column in both inputs, so we
// don't have to permute the inputs and results for the
// `AddCombinedRowToIdTable` class to work correctly.
AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0);
auto rowAdder = makeRowAdder(std::move(yieldTable));

ad_utility::Timer timer{
ad_utility::timer::Timer::InitialStatus::Started};
auto [leftBlocksInternal, rightBlocksInternal] =
IndexScan::lazyScanForJoinOfTwoScans(*leftScan, *rightScan);
runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs());

auto leftBlocks = convertGenerator(std::move(leftBlocksInternal));
auto rightBlocks = convertGenerator(std::move(rightBlocksInternal));

ad_utility::zipperJoinForBlocksWithoutUndef(leftBlocks, rightBlocks,
std::less{}, rowAdder);

leftScan->updateRuntimeInfoForLazyScan(leftBlocks.details());
rightScan->updateRuntimeInfoForLazyScan(rightBlocks.details());

AD_CORRECTNESS_CHECK(leftBlocks.details().numBlocksRead_ <=
rightBlocks.details().numElementsRead_);
AD_CORRECTNESS_CHECK(rightBlocks.details().numBlocksRead_ <=
leftBlocks.details().numElementsRead_);
auto localVocab = std::move(rowAdder.localVocab());
return Result::IdTableVocabPair{std::move(rowAdder).resultTable(),
std::move(localVocab)};
});
}

// ______________________________________________________________________________________________________
template <bool idTableIsRightInput>
ProtoResult Join::computeResultForIndexScanAndIdTable(
Expand Down Expand Up @@ -834,3 +801,11 @@ ad_utility::AddCombinedRowToIdTable Join::makeRowAdder(
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(callback)};
}
// _____________________________________________________________________________
std::vector<Operation*> Join::getIndexScansForSortVariables(
std::span<const Variable> variables) {
auto result = _left->getIndexScansForSortVariables(variables);
auto right = _right->getIndexScansForSortVariables(variables);
result.insert(result.end(), right.begin(), right.end());
return result;
}

Check warning on line 811 in src/engine/Join.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Join.cpp#L806-L811

Added lines #L806 - L811 were not covered by tests
9 changes: 4 additions & 5 deletions src/engine/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ class Join : public Operation {
static void hashJoin(const IdTable& dynA, ColumnIndex jc1,
const IdTable& dynB, ColumnIndex jc2, IdTable* dynRes);

// TODO<joka921> Comment.
std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) override;

protected:
virtual string getCacheKeyImpl() const override;

Expand All @@ -149,11 +153,6 @@ class Join : public Operation {

VariableToColumnMap computeVariableToColumnMap() const override;

// A special implementation that is called when both children are
// `IndexScan`s. Uses the lazy scans to only retrieve the subset of the
// `IndexScan`s that is actually needed without fully materializing them.
ProtoResult computeResultForTwoIndexScans(bool requestLaziness) const;

// A special implementation that is called when exactly one of the children is
// an `IndexScan` and the other one is a fully materialized result. The
// argument `idTableIsRightInput` determines whether the `IndexScan` is the
Expand Down
6 changes: 6 additions & 0 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ class Operation {
RuntimeInformation::Status status =
RuntimeInformation::Status::optimizedOut);

// TODO<joka921> Comment.
virtual std::vector<Operation*> getIndexScansForSortVariables(
[[maybe_unused]] std::span<const Variable> variables) {
return {};
}

private:
// Create the runtime information in case the evaluation of this operation has
// failed.
Expand Down
15 changes: 15 additions & 0 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,18 @@ QueryExecutionTree::getVariableAndInfoByColumnIndex(ColumnIndex colIdx) const {
AD_CONTRACT_CHECK(it != varColMap.end());
return *it;
}

// _____________________________________________________________________________
std::vector<Operation*> QueryExecutionTree::getIndexScansForSortVariables(
std::span<const Variable> variables) {
auto result = rootOperation_->getIndexScansForSortVariables(variables);
if (result.empty()) {
return result;
}
// TODO<joka921> We have to disable the caching as soon as the PR for that is
// merged.
// rootOperation_->disableCaching();
cachedResult_.reset();
sizeEstimate_.reset();
return result;
}
4 changes: 4 additions & 0 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class QueryExecutionTree {
return rootOperation_->collectWarnings();
}

// TODO<joka921> Comment.
virtual std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) final;

template <typename F>
void forAllDescendants(F f) {
static_assert(
Expand Down
2 changes: 1 addition & 1 deletion src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ class CompressedRelationReader {
// to be performed.
struct ScanSpecAndBlocks {
ScanSpecification scanSpec_;
const std::span<const CompressedBlockMetadata> blockMetadata_;
std::span<const CompressedBlockMetadata> blockMetadata_;
};

// This struct additionally contains the first and last triple of the scan
Expand Down

0 comments on commit 8a362a0

Please sign in to comment.