Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DeltaTriplesManager #1603

Merged
merged 14 commits into from
Nov 8, 2024
7 changes: 4 additions & 3 deletions src/engine/CountAvailablePredicates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), std::nullopt,
std::nullopt}
.toScanSpecification(index);
auto fullHasPattern = index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {},
cancellationHandle_, deltaTriples());
auto fullHasPattern =
index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {}, cancellationHandle_,
locatedTriplesSnapshot());

Check warning on line 171 in src/engine/CountAvailablePredicates.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/CountAvailablePredicates.cpp#L168-L171

Added lines #L168 - L171 were not covered by tests
for (const auto& idTable : fullHasPattern) {
for (const auto& patternId : idTable.getColumn(1)) {
AD_CORRECTNESS_CHECK(patternId.getDatatype() == Datatype::Int);
Expand Down
10 changes: 5 additions & 5 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ std::optional<IdTable> GroupBy::computeGroupByObjectWithCount() const {
getExecutionContext()->getIndex().getPimpl().getPermutation(
indexScan->permutation());
auto result = permutation.getDistinctCol1IdsAndCounts(
col0Id.value(), cancellationHandle_, deltaTriples());
col0Id.value(), cancellationHandle_, locatedTriplesSnapshot());
indexScan->updateRuntimeInformationWhenOptimizedOut(
{}, RuntimeInformation::Status::optimizedOut);

Expand Down Expand Up @@ -717,8 +717,8 @@ std::optional<IdTable> GroupBy::computeGroupByForFullIndexScan() const {
const auto& permutation =
getExecutionContext()->getIndex().getPimpl().getPermutation(
permutationEnum.value());
auto table = permutation.getDistinctCol0IdsAndCounts(cancellationHandle_,
deltaTriples());
auto table = permutation.getDistinctCol0IdsAndCounts(
cancellationHandle_, locatedTriplesSnapshot());
if (numCounts == 0) {
table.setColumnSubset({{0}});
}
Expand Down Expand Up @@ -840,7 +840,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
Id currentId = subresult->idTable()(0, columnIndex);
size_t currentCount = 0;
size_t currentCardinality =
index.getCardinality(currentId, permutation, deltaTriples());
index.getCardinality(currentId, permutation, locatedTriplesSnapshot());

auto pushRow = [&]() {
// If the count is 0 this means that the element with the `currentId`
Expand All @@ -863,7 +863,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
// without the internally added triples, but that is not easy to
// retrieve right now.
currentCardinality =
index.getCardinality(id, permutation, deltaTriples());
index.getCardinality(id, permutation, locatedTriplesSnapshot());
}
currentCount += currentCardinality;
}
Expand Down
13 changes: 7 additions & 6 deletions src/engine/HasPredicateScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ ProtoResult HasPredicateScan::computeResult(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), std::nullopt,
std::nullopt}
.toScanSpecification(index);
auto hasPattern = index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {},
cancellationHandle_, deltaTriples());
auto hasPattern =
index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {}, cancellationHandle_,
locatedTriplesSnapshot());

auto getId = [this](const TripleComponent tc) {
std::optional<Id> id = tc.toValueId(getIndex().getVocab());
Expand Down Expand Up @@ -339,9 +340,9 @@ void HasPredicateScan::computeFreeO(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), subjectAsId,
std::nullopt}
.toScanSpecification(index);
auto hasPattern =
index.getPermutation(Permutation::Enum::PSO)
.scan(std::move(scanSpec), {}, cancellationHandle_, deltaTriples());
auto hasPattern = index.getPermutation(Permutation::Enum::PSO)
.scan(std::move(scanSpec), {}, cancellationHandle_,
locatedTriplesSnapshot());
AD_CORRECTNESS_CHECK(hasPattern.numRows() <= 1);
for (Id patternId : hasPattern.getColumn(0)) {
const auto& pattern = patterns[patternId.getInt()];
Expand Down
12 changes: 6 additions & 6 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {
const auto& index = _executionContext->getIndex();
idTable =
index.scan(getScanSpecification(), permutation_, additionalColumns(),
cancellationHandle_, deltaTriples(), getLimit());
cancellationHandle_, locatedTriplesSnapshot(), getLimit());
AD_CORRECTNESS_CHECK(idTable.numColumns() == getResultWidth());
LOG(DEBUG) << "IndexScan result computation done.\n";
checkCancellation();
Expand All @@ -174,7 +174,7 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {
size_t IndexScan::computeSizeEstimate() const {
AD_CORRECTNESS_CHECK(_executionContext);
return getIndex().getResultSizeOfScan(getScanSpecification(), permutation_,
deltaTriples());
locatedTriplesSnapshot());
}

// _____________________________________________________________________________
Expand All @@ -195,7 +195,7 @@ void IndexScan::determineMultiplicities() {
return {1.0f};
} else if (numVariables_ == 2) {
return idx.getMultiplicities(*getPermutedTriple()[0], permutation_,
deltaTriples());
locatedTriplesSnapshot());
} else {
AD_CORRECTNESS_CHECK(numVariables_ == 3);
return idx.getMultiplicities(permutation_);
Expand Down Expand Up @@ -245,16 +245,16 @@ Permutation::IdTableGenerator IndexScan::getLazyScan(
.getImpl()
.getPermutation(permutation())
.lazyScan(getScanSpecification(), std::move(actualBlocks),
additionalColumns(), cancellationHandle_, deltaTriples(),
getLimit());
additionalColumns(), cancellationHandle_,
locatedTriplesSnapshot(), getLimit());
};

// ________________________________________________________________
std::optional<Permutation::MetadataAndBlocks> IndexScan::getMetadataForScan()
const {
const auto& index = getExecutionContext()->getIndex().getImpl();
return index.getPermutation(permutation())
.getMetadataAndBlocks(getScanSpecification(), deltaTriples());
.getMetadataAndBlocks(getScanSpecification(), locatedTriplesSnapshot());
};

// ________________________________________________________________
Expand Down
4 changes: 2 additions & 2 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class Operation {

const Index& getIndex() const { return _executionContext->getIndex(); }

const DeltaTriples& deltaTriples() const {
return _executionContext->deltaTriples();
const auto& locatedTriplesSnapshot() const {
return _executionContext->locatedTriplesSnapshot();
}

// Get a unique, not ambiguous string representation for a subtree.
Expand Down
25 changes: 15 additions & 10 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright 2011, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author:
// 2011-2017 Björn Buchhold ([email protected])
// 2018- Johannes Kalmbach ([email protected])
// Copyright 2011 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures
// Authors: Björn Buchhold <[email protected]> [2011 - 2017]
// Johannes Kalmbach <[email protected]> [2017 - 2024]

#pragma once

Expand Down Expand Up @@ -92,7 +91,10 @@ class QueryExecutionContext {

[[nodiscard]] const Index& getIndex() const { return _index; }

const DeltaTriples& deltaTriples() const { return *deltaTriples_; }
const LocatedTriplesSnapshot& locatedTriplesSnapshot() const {
AD_CORRECTNESS_CHECK(sharedLocatedTriplesSnapshot_ != nullptr);
return *sharedLocatedTriplesSnapshot_;
}

void clearCacheUnpinnedOnly() { getQueryTreeCache().clearUnpinnedOnly(); }

Expand Down Expand Up @@ -123,10 +125,13 @@ class QueryExecutionContext {

private:
const Index& _index;
// TODO<joka921> This has to be stored externally once we properly support
// SPARQL UPDATE, currently it is just a stub to make the interface work.
std::shared_ptr<DeltaTriples> deltaTriples_{
std::make_shared<DeltaTriples>(_index)};

// When the `QueryExecutionContext` is constructed, get a stable read-only
// snapshot of the current (located) delta triples. These can then be used
// by the respective query without interfering with further incoming
// update operations.
SharedLocatedTriplesSnapshot sharedLocatedTriplesSnapshot_{
_index.deltaTriplesManager().getCurrentSnapshot()};
QueryResultCache* const _subtreeCache;
// allocators are copied but hold shared state
ad_utility::AllocatorWithLimit<Id> _allocator;
Expand Down
9 changes: 4 additions & 5 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright 2015, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author:
// 2015-2017 Björn Buchhold ([email protected])
// 2018- Johannes Kalmbach ([email protected])
// Copyright 2015 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures
// Authors: Björn Buchhold <[email protected]> [2015 - 2017]
// Johannes Kalmbach <[email protected]> [2017 - 2024]

#include "./QueryExecutionTree.h"

Expand Down
8 changes: 5 additions & 3 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2015, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Björn Buchhold ([email protected])
// Copyright 2015 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures
// Authors: Björn Buchhold <[email protected]>
// Johannes Kalmbach <[email protected]>

#pragma once

#include <memory>
Expand Down
62 changes: 51 additions & 11 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2023 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures.
// Authors:
// 2023 Hannah Bast <bast@cs.uni-freiburg.de>
// 2024 Julian Mundhahs <mundhahj@tf.uni-freiburg.de>
// Chair of Algorithms and Data Structures
// Authors: Hannah Bast <[email protected]>
// Julian Mundhahs <mundhahj@tf.uni-freiburg.de>
// Johannes Kalmbach <kalmbach@cs.uni-freiburg.de>

#include "index/DeltaTriples.h"

Expand All @@ -21,8 +21,7 @@ LocatedTriples::iterator& DeltaTriples::LocatedTripleHandles::forPermutation(
void DeltaTriples::clear() {
triplesInserted_.clear();
triplesDeleted_.clear();
std::ranges::for_each(locatedTriplesPerBlock_,
&LocatedTriplesPerBlock::clear);
std::ranges::for_each(locatedTriples(), &LocatedTriplesPerBlock::clear);
}

// ____________________________________________________________________________
Expand All @@ -33,15 +32,15 @@ DeltaTriples::locateAndAddTriples(CancellationHandle cancellationHandle,
std::array<std::vector<LocatedTriples::iterator>, Permutation::ALL.size()>
intermediateHandles;
for (auto permutation : Permutation::ALL) {
auto& perm = index_.getImpl().getPermutation(permutation);
auto& perm = index_.getPermutation(permutation);
auto locatedTriples = LocatedTriple::locateTriplesInPermutation(
// TODO<qup42>: replace with `getAugmentedMetadata` once integration
// is done
idTriples, perm.metaData().blockData(), perm.keyOrder(), shouldExist,
cancellationHandle);
cancellationHandle->throwIfCancelled();
intermediateHandles[static_cast<size_t>(permutation)] =
locatedTriplesPerBlock_[static_cast<size_t>(permutation)].add(
this->locatedTriples()[static_cast<size_t>(permutation)].add(
locatedTriples);
cancellationHandle->throwIfCancelled();
}
Expand All @@ -60,8 +59,8 @@ void DeltaTriples::eraseTripleInAllPermutations(LocatedTripleHandles& handles) {
// Erase for all permutations.
for (auto permutation : Permutation::ALL) {
auto ltIter = handles.forPermutation(permutation);
locatedTriplesPerBlock_[static_cast<int>(permutation)].erase(
ltIter->blockIndex_, ltIter);
locatedTriples()[static_cast<int>(permutation)].erase(ltIter->blockIndex_,
ltIter);
}
}

Expand Down Expand Up @@ -172,7 +171,48 @@ void DeltaTriples::modifyTriplesImpl(CancellationHandle cancellationHandle,
}

// ____________________________________________________________________________
const LocatedTriplesPerBlock& DeltaTriples::getLocatedTriplesPerBlock(
const LocatedTriplesPerBlock&
LocatedTriplesSnapshot::getLocatedTriplesForPermutation(
Permutation::Enum permutation) const {
return locatedTriplesPerBlock_[static_cast<int>(permutation)];
}

// ____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() const {
// NOTE: Both members of the `LocatedTriplesSnapshot` are copied, but the
// `localVocab_` has no copy constructor (in order to avoid accidental
// copies), hence the explicit `clone`.
return SharedLocatedTriplesSnapshot{std::make_shared<LocatedTriplesSnapshot>(
locatedTriples(), localVocab_.clone())};
}

// ____________________________________________________________________________
DeltaTriples::DeltaTriples(const Index& index)
: DeltaTriples(index.getImpl()) {}

// ____________________________________________________________________________
DeltaTriplesManager::DeltaTriplesManager(const IndexImpl& index)
: deltaTriples_{index},
currentLocatedTriplesSnapshot_{deltaTriples_.rlock()->getSnapshot()} {}

// _____________________________________________________________________________
void DeltaTriplesManager::modify(
const std::function<void(DeltaTriples&)>& function) {
// While holding the lock for the underlying `DeltaTriples`, perform the
// actual `function` (typically some combination of insert and delete
// operations) and (while still holding the lock) update the
// `currentLocatedTriplesSnapshot_`.
deltaTriples_.withWriteLock([this, &function](DeltaTriples& deltaTriples) {
function(deltaTriples);
auto newSnapshot = deltaTriples.getSnapshot();
currentLocatedTriplesSnapshot_.withWriteLock(
[&newSnapshot](auto& currentSnapshot) {
currentSnapshot = std::move(newSnapshot);
});
});
}

// _____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriplesManager::getCurrentSnapshot() const {
return *currentLocatedTriplesSnapshot_.rlock();
}
Loading
Loading