Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DeltaTriplesManager #1603

Merged
merged 14 commits into from
Nov 8, 2024
7 changes: 4 additions & 3 deletions src/engine/CountAvailablePredicates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ void CountAvailablePredicates::computePatternTrickAllEntities(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), std::nullopt,
std::nullopt}
.toScanSpecification(index);
auto fullHasPattern = index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {},
cancellationHandle_, deltaTriples());
auto fullHasPattern =
index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {}, cancellationHandle_,
locatedTriplesSnapshot());
for (const auto& idTable : fullHasPattern) {
for (const auto& patternId : idTable.getColumn(1)) {
AD_CORRECTNESS_CHECK(patternId.getDatatype() == Datatype::Int);
Expand Down
10 changes: 5 additions & 5 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ std::optional<IdTable> GroupBy::computeGroupByObjectWithCount() const {
getExecutionContext()->getIndex().getPimpl().getPermutation(
indexScan->permutation());
auto result = permutation.getDistinctCol1IdsAndCounts(
col0Id.value(), cancellationHandle_, deltaTriples());
col0Id.value(), cancellationHandle_, locatedTriplesSnapshot());
indexScan->updateRuntimeInformationWhenOptimizedOut(
{}, RuntimeInformation::Status::optimizedOut);

Expand Down Expand Up @@ -717,8 +717,8 @@ std::optional<IdTable> GroupBy::computeGroupByForFullIndexScan() const {
const auto& permutation =
getExecutionContext()->getIndex().getPimpl().getPermutation(
permutationEnum.value());
auto table = permutation.getDistinctCol0IdsAndCounts(cancellationHandle_,
deltaTriples());
auto table = permutation.getDistinctCol0IdsAndCounts(
cancellationHandle_, locatedTriplesSnapshot());
if (numCounts == 0) {
table.setColumnSubset({{0}});
}
Expand Down Expand Up @@ -840,7 +840,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
Id currentId = subresult->idTable()(0, columnIndex);
size_t currentCount = 0;
size_t currentCardinality =
index.getCardinality(currentId, permutation, deltaTriples());
index.getCardinality(currentId, permutation, locatedTriplesSnapshot());

auto pushRow = [&]() {
// If the count is 0 this means that the element with the `currentId`
Expand All @@ -863,7 +863,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
// without the internally added triples, but that is not easy to
// retrieve right now.
currentCardinality =
index.getCardinality(id, permutation, deltaTriples());
index.getCardinality(id, permutation, locatedTriplesSnapshot());
}
currentCount += currentCardinality;
}
Expand Down
13 changes: 7 additions & 6 deletions src/engine/HasPredicateScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ ProtoResult HasPredicateScan::computeResult(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), std::nullopt,
std::nullopt}
.toScanSpecification(index);
auto hasPattern = index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {},
cancellationHandle_, deltaTriples());
auto hasPattern =
index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {}, cancellationHandle_,
locatedTriplesSnapshot());

auto getId = [this](const TripleComponent tc) {
std::optional<Id> id = tc.toValueId(getIndex().getVocab());
Expand Down Expand Up @@ -339,9 +340,9 @@ void HasPredicateScan::computeFreeO(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), subjectAsId,
std::nullopt}
.toScanSpecification(index);
auto hasPattern =
index.getPermutation(Permutation::Enum::PSO)
.scan(std::move(scanSpec), {}, cancellationHandle_, deltaTriples());
auto hasPattern = index.getPermutation(Permutation::Enum::PSO)
.scan(std::move(scanSpec), {}, cancellationHandle_,
locatedTriplesSnapshot());
AD_CORRECTNESS_CHECK(hasPattern.numRows() <= 1);
for (Id patternId : hasPattern.getColumn(0)) {
const auto& pattern = patterns[patternId.getInt()];
Expand Down
12 changes: 6 additions & 6 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {
const auto& index = _executionContext->getIndex();
idTable =
index.scan(getScanSpecification(), permutation_, additionalColumns(),
cancellationHandle_, deltaTriples(), getLimit());
cancellationHandle_, locatedTriplesSnapshot(), getLimit());
AD_CORRECTNESS_CHECK(idTable.numColumns() == getResultWidth());
LOG(DEBUG) << "IndexScan result computation done.\n";
checkCancellation();
Expand All @@ -174,7 +174,7 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {
size_t IndexScan::computeSizeEstimate() const {
AD_CORRECTNESS_CHECK(_executionContext);
return getIndex().getResultSizeOfScan(getScanSpecification(), permutation_,
deltaTriples());
locatedTriplesSnapshot());
}

// _____________________________________________________________________________
Expand All @@ -195,7 +195,7 @@ void IndexScan::determineMultiplicities() {
return {1.0f};
} else if (numVariables_ == 2) {
return idx.getMultiplicities(*getPermutedTriple()[0], permutation_,
deltaTriples());
locatedTriplesSnapshot());
} else {
AD_CORRECTNESS_CHECK(numVariables_ == 3);
return idx.getMultiplicities(permutation_);
Expand Down Expand Up @@ -245,16 +245,16 @@ Permutation::IdTableGenerator IndexScan::getLazyScan(
.getImpl()
.getPermutation(permutation())
.lazyScan(getScanSpecification(), std::move(actualBlocks),
additionalColumns(), cancellationHandle_, deltaTriples(),
getLimit());
additionalColumns(), cancellationHandle_,
locatedTriplesSnapshot(), getLimit());
};

// ________________________________________________________________
std::optional<Permutation::MetadataAndBlocks> IndexScan::getMetadataForScan()
const {
const auto& index = getExecutionContext()->getIndex().getImpl();
return index.getPermutation(permutation())
.getMetadataAndBlocks(getScanSpecification(), deltaTriples());
.getMetadataAndBlocks(getScanSpecification(), locatedTriplesSnapshot());
};

// ________________________________________________________________
Expand Down
4 changes: 2 additions & 2 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class Operation {

const Index& getIndex() const { return _executionContext->getIndex(); }

const DeltaTriples& deltaTriples() const {
return _executionContext->deltaTriples();
const auto& locatedTriplesSnapshot() const {
return _executionContext->locatedTriplesSnapshot();
}

// Get a unique, not ambiguous string representation for a subtree.
Expand Down
9 changes: 6 additions & 3 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ class QueryExecutionContext {

[[nodiscard]] const Index& getIndex() const { return _index; }

const DeltaTriples& deltaTriples() const { return *deltaTriples_; }
const LocatedTriplesSnapshot& locatedTriplesSnapshot() const {
AD_CORRECTNESS_CHECK(sharedLocatedTriplesSnapshot != nullptr);
return *sharedLocatedTriplesSnapshot;
}

void clearCacheUnpinnedOnly() { getQueryTreeCache().clearUnpinnedOnly(); }

Expand Down Expand Up @@ -125,8 +128,8 @@ class QueryExecutionContext {
const Index& _index;
// TODO<joka921> This has to be stored externally once we properly support
// SPARQL UPDATE, currently it is just a stub to make the interface work.
std::shared_ptr<DeltaTriples> deltaTriples_{
std::make_shared<DeltaTriples>(_index)};
SharedLocatedTriplesSnapshot sharedLocatedTriplesSnapshot{
_index.deltaTriplesManager().getSnapshot()};
QueryResultCache* const _subtreeCache;
// allocators are copied but hold shared state
ad_utility::AllocatorWithLimit<Id> _allocator;
Expand Down
43 changes: 38 additions & 5 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ LocatedTriples::iterator& DeltaTriples::LocatedTripleHandles::forPermutation(
void DeltaTriples::clear() {
triplesInserted_.clear();
triplesDeleted_.clear();
std::ranges::for_each(locatedTriplesPerBlock_,
std::ranges::for_each(locatedTriplesPerBlock(),
&LocatedTriplesPerBlock::clear);
}

Expand All @@ -33,15 +33,15 @@ DeltaTriples::locateAndAddTriples(CancellationHandle cancellationHandle,
std::array<std::vector<LocatedTriples::iterator>, Permutation::ALL.size()>
intermediateHandles;
for (auto permutation : Permutation::ALL) {
auto& perm = index_.getImpl().getPermutation(permutation);
auto& perm = index_.getPermutation(permutation);
auto locatedTriples = LocatedTriple::locateTriplesInPermutation(
// TODO<qup42>: replace with `getAugmentedMetadata` once integration
// is done
idTriples, perm.metaData().blockData(), perm.keyOrder(), shouldExist,
cancellationHandle);
cancellationHandle->throwIfCancelled();
intermediateHandles[static_cast<size_t>(permutation)] =
locatedTriplesPerBlock_[static_cast<size_t>(permutation)].add(
locatedTriplesPerBlock()[static_cast<size_t>(permutation)].add(
locatedTriples);
cancellationHandle->throwIfCancelled();
}
Expand All @@ -60,7 +60,7 @@ void DeltaTriples::eraseTripleInAllPermutations(LocatedTripleHandles& handles) {
// Erase for all permutations.
for (auto permutation : Permutation::ALL) {
auto ltIter = handles.forPermutation(permutation);
locatedTriplesPerBlock_[static_cast<int>(permutation)].erase(
locatedTriplesPerBlock()[static_cast<int>(permutation)].erase(
ltIter->blockIndex_, ltIter);
}
}
Expand Down Expand Up @@ -172,7 +172,40 @@ void DeltaTriples::modifyTriplesImpl(CancellationHandle cancellationHandle,
}

// ____________________________________________________________________________
const LocatedTriplesPerBlock& DeltaTriples::getLocatedTriplesPerBlock(
const LocatedTriplesPerBlock& LocatedTriplesSnapshot::getLocatedTriplesPerBlock(
Permutation::Enum permutation) const {
return locatedTriplesPerBlock_[static_cast<int>(permutation)];
}

// ____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriples::copySnapshot() const {
return std::make_shared<LocatedTriplesSnapshot>(locatedTriplesPerBlock(),
localVocab_.clone());
}

// ____________________________________________________________________________
DeltaTriples::DeltaTriples(const Index& index)
: DeltaTriples(index.getImpl()) {}

// ____________________________________________________________________________
DeltaTriplesManager::DeltaTriplesManager(const IndexImpl& index)
: deltaTriples_{index},
currentLocatedTriplesSnapshot_{deltaTriples_.rlock()->copySnapshot()} {}

// _____________________________________________________________________________
void DeltaTriplesManager::modify(std::function<void(DeltaTriples&)> function) {
// While holding the lock for the underlying `DeltaTriples`, perform the
// actual update (insert or delete) and (while still holding the lock) update
// the `currentLocatedTriplesPerBlock`.
deltaTriples_.withWriteLock([this, &function](DeltaTriples& deltaTriples) {
function(deltaTriples);
currentLocatedTriplesSnapshot_.withWriteLock([&deltaTriples](auto& ptr) {
ptr = deltaTriples.copySnapshot();
});
});
}

// _____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriplesManager::getSnapshot() const {
return *currentLocatedTriplesSnapshot_.rlock();
}
85 changes: 74 additions & 11 deletions src/index/DeltaTriples.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,29 @@
#include "index/IndexBuilderTypes.h"
#include "index/LocatedTriples.h"
#include "index/Permutation.h"
#include "util/Synchronized.h"

// Typedef for one `LocatedTriplesPerBlock` object for each of the six
// permutations.
using LocatedTriplesPerBlockAllPermutations =
std::array<LocatedTriplesPerBlock, Permutation::ALL.size()>;

// The locations of a set of delta triples (triples that were inserted or
// deleted since the index was built) in each of the six permutations, and a
// local vocab. This is all the information that is required to perform a query
// that correctly respects these delta triples, hence the name.
struct LocatedTriplesSnapshot {
LocatedTriplesPerBlockAllPermutations locatedTriplesPerBlock_;
LocalVocab localVocab_;
// Get `TripleWithPosition` objects for given permutation.
const LocatedTriplesPerBlock& getLocatedTriplesPerBlock(
Permutation::Enum permutation) const;
};

// A `shared_ptr` to a const `LocatedTriplesSnapshot`, but as an explicit class,
// s.t. it can be forward-declared.
class SharedLocatedTriplesSnapshot
: public std::shared_ptr<const LocatedTriplesSnapshot> {};

// A class for maintaining triples that are inserted or deleted after index
// building, we call these delta triples. How it works in principle:
Expand All @@ -33,9 +56,16 @@ class DeltaTriples {
FRIEND_TEST(DeltaTriplesTest, clear);
FRIEND_TEST(DeltaTriplesTest, addTriplesToLocalVocab);

public:
using Triples = std::vector<IdTriple<0>>;
using CancellationHandle = ad_utility::SharedCancellationHandle;

private:
// The index to which these triples are added.
const Index& index_;
const IndexImpl& index_;

// The located triples for all the 6 permutations.
LocatedTriplesPerBlockAllPermutations locatedTriplesPerBlock_;

// The local vocabulary of the delta triples (they may have components,
// which are not contained in the vocabulary of the original index).
Expand All @@ -52,10 +82,6 @@ class DeltaTriples {
static_assert(static_cast<int>(Permutation::Enum::OSP) == 5);
static_assert(Permutation::ALL.size() == 6);

// The positions of the delta triples in each of the six permutations.
std::array<LocatedTriplesPerBlock, Permutation::ALL.size()>
locatedTriplesPerBlock_;

// Each delta triple needs to know where it is stored in each of the six
// `LocatedTriplesPerBlock` above.
struct LocatedTripleHandles {
Expand All @@ -66,8 +92,6 @@ class DeltaTriples {
};
using TriplesToHandlesMap =
ad_utility::HashMap<IdTriple<0>, LocatedTripleHandles>;
using Triples = std::vector<IdTriple<0>>;
using CancellationHandle = ad_utility::SharedCancellationHandle;

// The sets of triples added to and subtracted from the original index. Any
// triple can be at most in one of the sets. The information whether a triple
Expand All @@ -78,15 +102,26 @@ class DeltaTriples {

public:
// Construct for given index.
explicit DeltaTriples(const Index& index) : index_(index) {}
explicit DeltaTriples(const Index& index);
explicit DeltaTriples(const IndexImpl& index) : index_{index} {};

DeltaTriples(const DeltaTriples&) = delete;
DeltaTriples& operator=(const DeltaTriples&) = delete;

// Get the common `LocalVocab` of the delta triples.
private:
LocalVocab& localVocab() { return localVocab_; }
auto& locatedTriplesPerBlock() { return locatedTriplesPerBlock_; }
const auto& locatedTriplesPerBlock() const { return locatedTriplesPerBlock_; }

public:
const LocalVocab& localVocab() const { return localVocab_; }

const LocatedTriplesPerBlock& getLocatedTriplesPerBlock(
Permutation::Enum permutation) const {
return locatedTriplesPerBlock.at(static_cast<size_t>(permutation));
}

// Clear `triplesAdded_` and `triplesSubtracted_` and all associated data
// structures.
void clear();
Expand All @@ -101,9 +136,10 @@ class DeltaTriples {
// Delete triples.
void deleteTriples(CancellationHandle cancellationHandle, Triples triples);

// Get `TripleWithPosition` objects for given permutation.
const LocatedTriplesPerBlock& getLocatedTriplesPerBlock(
Permutation::Enum permutation) const;
// Return a deep copy of the `LocatedTriples` and the corresponding
// `LocalVocab` which form a snapshot of the current status of this
// `DeltaTriples` object.
SharedLocatedTriplesSnapshot copySnapshot() const;

private:
// Find the position of the given triple in the given permutation and add it
Expand Down Expand Up @@ -144,6 +180,33 @@ class DeltaTriples {
void eraseTripleInAllPermutations(LocatedTripleHandles& handles);
};

// This class synchronizes the access to a `DeltaTriples` object, thus avoiding
// race conditions between concurrent updates and queries.
class DeltaTriplesManager {
ad_utility::Synchronized<DeltaTriples> deltaTriples_;
ad_utility::Synchronized<SharedLocatedTriplesSnapshot, std::shared_mutex>
currentLocatedTriplesSnapshot_;

public:
using CancellationHandle = DeltaTriples::CancellationHandle;
using Triples = DeltaTriples::Triples;

explicit DeltaTriplesManager(const IndexImpl& index);
FRIEND_TEST(DeltaTriplesTest, DeltaTriplesManager);

// Modify the underlying `DeltaTriples` by applying the `function` to them.
// Then update the current snapshot, s.t. subsequent calls to `getSnapshot`
// will observe the modifications. All this is done in a thread-safe way,
// meaning that there can be only one call to `modify` at the same time.
void modify(std::function<void(DeltaTriples&)> function);

// Return a `SharedLocatedTriplesSnapshot` that contains a deep copy of the
// state of the underlying `DeltaTriples` after the last completed UPDATE, and
// thus is not affected by future UPDATE requests. It can therefore be used to
// execute a query in a consistent way.
SharedLocatedTriplesSnapshot getSnapshot() const;
};

// DELTA TRIPLES AND THE CACHE
//
// Changes to the DeltaTriples invalidate all cache results that have an index
Expand Down
Loading
Loading