Skip to content

Commit

Permalink
Add DeltaTriplesManager (ad-freiburg#1603)
Browse files Browse the repository at this point in the history
The already existing `DeltaTriples` class maintains a dynamically changing set of insertions and deletions relative to the original input data, together with a (single) local vocab. The class is not threadsafe and has to be used with care. In particular, concurrent update queries have to be serialized, and while a query makes use of the "delta triples", it has to be made sure that they are not changed over the course of the processing of that query.

Both of these problems are solved by the new `DeltaTriplesManager` class. The index has a single object of this class. It maintains a single `DeltaTriples` object, write access to which is strictly serialized. Each new query gets a so-called *snapshot* of the current delta triples. This is a full copy (of the delta triples located in each of the permutations and of the local vocab). These snapshots are read-only and multiple queries can share the same snapshot. A snapshot lives as long as one query using it is still being processed.
  • Loading branch information
joka921 authored and ullingerc committed Nov 12, 2024
1 parent 028f591 commit 2a09ece
Show file tree
Hide file tree
Showing 22 changed files with 497 additions and 211 deletions.
7 changes: 4 additions & 3 deletions src/engine/CountAvailablePredicates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ void CountAvailablePredicates::computePatternTrickAllEntities(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), std::nullopt,
std::nullopt}
.toScanSpecification(index);
auto fullHasPattern = index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {},
cancellationHandle_, deltaTriples());
auto fullHasPattern =
index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {}, cancellationHandle_,
locatedTriplesSnapshot());
for (const auto& idTable : fullHasPattern) {
for (const auto& patternId : idTable.getColumn(1)) {
AD_CORRECTNESS_CHECK(patternId.getDatatype() == Datatype::Int);
Expand Down
10 changes: 5 additions & 5 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ std::optional<IdTable> GroupBy::computeGroupByObjectWithCount() const {
getExecutionContext()->getIndex().getPimpl().getPermutation(
indexScan->permutation());
auto result = permutation.getDistinctCol1IdsAndCounts(
col0Id.value(), cancellationHandle_, deltaTriples());
col0Id.value(), cancellationHandle_, locatedTriplesSnapshot());
indexScan->updateRuntimeInformationWhenOptimizedOut(
{}, RuntimeInformation::Status::optimizedOut);

Expand Down Expand Up @@ -717,8 +717,8 @@ std::optional<IdTable> GroupBy::computeGroupByForFullIndexScan() const {
const auto& permutation =
getExecutionContext()->getIndex().getPimpl().getPermutation(
permutationEnum.value());
auto table = permutation.getDistinctCol0IdsAndCounts(cancellationHandle_,
deltaTriples());
auto table = permutation.getDistinctCol0IdsAndCounts(
cancellationHandle_, locatedTriplesSnapshot());
if (numCounts == 0) {
table.setColumnSubset({{0}});
}
Expand Down Expand Up @@ -840,7 +840,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
Id currentId = subresult->idTable()(0, columnIndex);
size_t currentCount = 0;
size_t currentCardinality =
index.getCardinality(currentId, permutation, deltaTriples());
index.getCardinality(currentId, permutation, locatedTriplesSnapshot());

auto pushRow = [&]() {
// If the count is 0 this means that the element with the `currentId`
Expand All @@ -863,7 +863,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
// without the internally added triples, but that is not easy to
// retrieve right now.
currentCardinality =
index.getCardinality(id, permutation, deltaTriples());
index.getCardinality(id, permutation, locatedTriplesSnapshot());
}
currentCount += currentCardinality;
}
Expand Down
13 changes: 7 additions & 6 deletions src/engine/HasPredicateScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ ProtoResult HasPredicateScan::computeResult(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), std::nullopt,
std::nullopt}
.toScanSpecification(index);
auto hasPattern = index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {},
cancellationHandle_, deltaTriples());
auto hasPattern =
index.getPermutation(Permutation::Enum::PSO)
.lazyScan(scanSpec, std::nullopt, {}, cancellationHandle_,
locatedTriplesSnapshot());

auto getId = [this](const TripleComponent tc) {
std::optional<Id> id = tc.toValueId(getIndex().getVocab());
Expand Down Expand Up @@ -339,9 +340,9 @@ void HasPredicateScan::computeFreeO(
TripleComponent::Iri::fromIriref(HAS_PATTERN_PREDICATE), subjectAsId,
std::nullopt}
.toScanSpecification(index);
auto hasPattern =
index.getPermutation(Permutation::Enum::PSO)
.scan(std::move(scanSpec), {}, cancellationHandle_, deltaTriples());
auto hasPattern = index.getPermutation(Permutation::Enum::PSO)
.scan(std::move(scanSpec), {}, cancellationHandle_,
locatedTriplesSnapshot());
AD_CORRECTNESS_CHECK(hasPattern.numRows() <= 1);
for (Id patternId : hasPattern.getColumn(0)) {
const auto& pattern = patterns[patternId.getInt()];
Expand Down
12 changes: 6 additions & 6 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {
const auto& index = _executionContext->getIndex();
idTable =
index.scan(getScanSpecification(), permutation_, additionalColumns(),
cancellationHandle_, deltaTriples(), getLimit());
cancellationHandle_, locatedTriplesSnapshot(), getLimit());
AD_CORRECTNESS_CHECK(idTable.numColumns() == getResultWidth());
LOG(DEBUG) << "IndexScan result computation done.\n";
checkCancellation();
Expand All @@ -174,7 +174,7 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {
size_t IndexScan::computeSizeEstimate() const {
AD_CORRECTNESS_CHECK(_executionContext);
return getIndex().getResultSizeOfScan(getScanSpecification(), permutation_,
deltaTriples());
locatedTriplesSnapshot());
}

// _____________________________________________________________________________
Expand All @@ -195,7 +195,7 @@ void IndexScan::determineMultiplicities() {
return {1.0f};
} else if (numVariables_ == 2) {
return idx.getMultiplicities(*getPermutedTriple()[0], permutation_,
deltaTriples());
locatedTriplesSnapshot());
} else {
AD_CORRECTNESS_CHECK(numVariables_ == 3);
return idx.getMultiplicities(permutation_);
Expand Down Expand Up @@ -245,16 +245,16 @@ Permutation::IdTableGenerator IndexScan::getLazyScan(
.getImpl()
.getPermutation(permutation())
.lazyScan(getScanSpecification(), std::move(actualBlocks),
additionalColumns(), cancellationHandle_, deltaTriples(),
getLimit());
additionalColumns(), cancellationHandle_,
locatedTriplesSnapshot(), getLimit());
};

// ________________________________________________________________
std::optional<Permutation::MetadataAndBlocks> IndexScan::getMetadataForScan()
const {
const auto& index = getExecutionContext()->getIndex().getImpl();
return index.getPermutation(permutation())
.getMetadataAndBlocks(getScanSpecification(), deltaTriples());
.getMetadataAndBlocks(getScanSpecification(), locatedTriplesSnapshot());
};

// ________________________________________________________________
Expand Down
4 changes: 2 additions & 2 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class Operation {

const Index& getIndex() const { return _executionContext->getIndex(); }

const DeltaTriples& deltaTriples() const {
return _executionContext->deltaTriples();
const auto& locatedTriplesSnapshot() const {
return _executionContext->locatedTriplesSnapshot();
}

// Get a unique, not ambiguous string representation for a subtree.
Expand Down
25 changes: 15 additions & 10 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright 2011, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author:
// 2011-2017 Björn Buchhold ([email protected])
// 2018- Johannes Kalmbach ([email protected])
// Copyright 2011 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures
// Authors: Björn Buchhold <[email protected]> [2011 - 2017]
// Johannes Kalmbach <[email protected]> [2017 - 2024]

#pragma once

Expand Down Expand Up @@ -92,7 +91,10 @@ class QueryExecutionContext {

[[nodiscard]] const Index& getIndex() const { return _index; }

const DeltaTriples& deltaTriples() const { return *deltaTriples_; }
const LocatedTriplesSnapshot& locatedTriplesSnapshot() const {
AD_CORRECTNESS_CHECK(sharedLocatedTriplesSnapshot_ != nullptr);
return *sharedLocatedTriplesSnapshot_;
}

void clearCacheUnpinnedOnly() { getQueryTreeCache().clearUnpinnedOnly(); }

Expand Down Expand Up @@ -123,10 +125,13 @@ class QueryExecutionContext {

private:
const Index& _index;
// TODO<joka921> This has to be stored externally once we properly support
// SPARQL UPDATE, currently it is just a stub to make the interface work.
std::shared_ptr<DeltaTriples> deltaTriples_{
std::make_shared<DeltaTriples>(_index)};

// When the `QueryExecutionContext` is constructed, get a stable read-only
// snapshot of the current (located) delta triples. These can then be used
// by the respective query without interfering with further incoming
// update operations.
SharedLocatedTriplesSnapshot sharedLocatedTriplesSnapshot_{
_index.deltaTriplesManager().getCurrentSnapshot()};
QueryResultCache* const _subtreeCache;
// allocators are copied but hold shared state
ad_utility::AllocatorWithLimit<Id> _allocator;
Expand Down
9 changes: 4 additions & 5 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright 2015, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author:
// 2015-2017 Björn Buchhold ([email protected])
// 2018- Johannes Kalmbach ([email protected])
// Copyright 2015 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures
// Authors: Björn Buchhold <[email protected]> [2015 - 2017]
// Johannes Kalmbach <[email protected]> [2017 - 2024]

#include "./QueryExecutionTree.h"

Expand Down
8 changes: 5 additions & 3 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2015, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Björn Buchhold ([email protected])
// Copyright 2015 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures
// Authors: Björn Buchhold <[email protected]>
// Johannes Kalmbach <[email protected]>

#pragma once

#include <memory>
Expand Down
62 changes: 51 additions & 11 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2023 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures.
// Authors:
// 2023 Hannah Bast <bast@cs.uni-freiburg.de>
// 2024 Julian Mundhahs <mundhahj@tf.uni-freiburg.de>
// Chair of Algorithms and Data Structures
// Authors: Hannah Bast <[email protected]>
// Julian Mundhahs <mundhahj@tf.uni-freiburg.de>
// Johannes Kalmbach <kalmbach@cs.uni-freiburg.de>

#include "index/DeltaTriples.h"

Expand All @@ -21,8 +21,7 @@ LocatedTriples::iterator& DeltaTriples::LocatedTripleHandles::forPermutation(
void DeltaTriples::clear() {
triplesInserted_.clear();
triplesDeleted_.clear();
std::ranges::for_each(locatedTriplesPerBlock_,
&LocatedTriplesPerBlock::clear);
std::ranges::for_each(locatedTriples(), &LocatedTriplesPerBlock::clear);
}

// ____________________________________________________________________________
Expand All @@ -33,15 +32,15 @@ DeltaTriples::locateAndAddTriples(CancellationHandle cancellationHandle,
std::array<std::vector<LocatedTriples::iterator>, Permutation::ALL.size()>
intermediateHandles;
for (auto permutation : Permutation::ALL) {
auto& perm = index_.getImpl().getPermutation(permutation);
auto& perm = index_.getPermutation(permutation);
auto locatedTriples = LocatedTriple::locateTriplesInPermutation(
// TODO<qup42>: replace with `getAugmentedMetadata` once integration
// is done
idTriples, perm.metaData().blockData(), perm.keyOrder(), shouldExist,
cancellationHandle);
cancellationHandle->throwIfCancelled();
intermediateHandles[static_cast<size_t>(permutation)] =
locatedTriplesPerBlock_[static_cast<size_t>(permutation)].add(
this->locatedTriples()[static_cast<size_t>(permutation)].add(
locatedTriples);
cancellationHandle->throwIfCancelled();
}
Expand All @@ -60,8 +59,8 @@ void DeltaTriples::eraseTripleInAllPermutations(LocatedTripleHandles& handles) {
// Erase for all permutations.
for (auto permutation : Permutation::ALL) {
auto ltIter = handles.forPermutation(permutation);
locatedTriplesPerBlock_[static_cast<int>(permutation)].erase(
ltIter->blockIndex_, ltIter);
locatedTriples()[static_cast<int>(permutation)].erase(ltIter->blockIndex_,
ltIter);
}
}

Expand Down Expand Up @@ -172,7 +171,48 @@ void DeltaTriples::modifyTriplesImpl(CancellationHandle cancellationHandle,
}

// ____________________________________________________________________________
const LocatedTriplesPerBlock& DeltaTriples::getLocatedTriplesPerBlock(
const LocatedTriplesPerBlock&
LocatedTriplesSnapshot::getLocatedTriplesForPermutation(
Permutation::Enum permutation) const {
return locatedTriplesPerBlock_[static_cast<int>(permutation)];
}

// ____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() const {
// NOTE: Both members of the `LocatedTriplesSnapshot` are copied, but the
// `localVocab_` has no copy constructor (in order to avoid accidental
// copies), hence the explicit `clone`.
return SharedLocatedTriplesSnapshot{std::make_shared<LocatedTriplesSnapshot>(
locatedTriples(), localVocab_.clone())};
}

// ____________________________________________________________________________
DeltaTriples::DeltaTriples(const Index& index)
: DeltaTriples(index.getImpl()) {}

// ____________________________________________________________________________
DeltaTriplesManager::DeltaTriplesManager(const IndexImpl& index)
: deltaTriples_{index},
currentLocatedTriplesSnapshot_{deltaTriples_.rlock()->getSnapshot()} {}

// _____________________________________________________________________________
void DeltaTriplesManager::modify(
const std::function<void(DeltaTriples&)>& function) {
// While holding the lock for the underlying `DeltaTriples`, perform the
// actual `function` (typically some combination of insert and delete
// operations) and (while still holding the lock) update the
// `currentLocatedTriplesSnapshot_`.
deltaTriples_.withWriteLock([this, &function](DeltaTriples& deltaTriples) {
function(deltaTriples);
auto newSnapshot = deltaTriples.getSnapshot();
currentLocatedTriplesSnapshot_.withWriteLock(
[&newSnapshot](auto& currentSnapshot) {
currentSnapshot = std::move(newSnapshot);
});
});
}

// _____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriplesManager::getCurrentSnapshot() const {
return *currentLocatedTriplesSnapshot_.rlock();
}
Loading

0 comments on commit 2a09ece

Please sign in to comment.