Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Nov 21, 2024
1 parent 1b55f45 commit b9b7cae
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 148 deletions.
267 changes: 127 additions & 140 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>> convertGenerator(
co_yield t;
}
}

// Wrap a fully materialized result in a `IdTableAndFirstCol` and an array.
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1> asSingleTableView(
const Result& result, const std::vector<ColumnIndex>& permutation) {
return std::array{ad_utility::IdTableAndFirstCol{
result.idTable().asColumnSubsetView(permutation),
result.getCopyOfLocalVocab()}};
}

// Wrap a result either in an array with a single element or in a range wrapping
// the lazy result generator.
std::variant<decltype(convertGenerator(std::declval<Result::Generator>())),
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1>>
resultToRange(const Result& result,
const std::vector<ColumnIndex>& permutation) {
if (result.isFullyMaterialized()) {
return asSingleTableView(result, permutation);
}
return convertGenerator(std::move(result.idTables()), permutation);
}
} // namespace

// _____________________________________________________________________________
Expand Down Expand Up @@ -114,8 +134,7 @@ ProtoResult Join::computeResult(bool requestLaziness) {
if (_left->knownEmptyResult() || _right->knownEmptyResult()) {
_left->getRootOperation()->updateRuntimeInformationWhenOptimizedOut();
_right->getRootOperation()->updateRuntimeInformationWhenOptimizedOut();
return {IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
resultSortedOn(), LocalVocab()};
return createEmptyResult();
}

// Always materialize results that meet one of the following criteria:
Expand Down Expand Up @@ -165,11 +184,7 @@ ProtoResult Join::computeResult(bool requestLaziness) {
checkCancellation();
if (leftRes->isFullyMaterialized() && leftRes->idTable().empty()) {
_right->getRootOperation()->updateRuntimeInformationWhenOptimizedOut();
// TODO<joka921, hannahbast, SPARQL update> When we add triples to the
// index, the vocabularies of index scans will not necessarily be empty and
// we need a mechanism to still retrieve them when using the lazy scan.
return {IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
resultSortedOn(), LocalVocab()};
return createEmptyResult();
}

// Note: If only one of the children is a scan, then we have made sure in the
Expand All @@ -186,15 +201,8 @@ ProtoResult Join::computeResult(bool requestLaziness) {
rightResIfCached ? rightResIfCached : _right->getResult(true);
checkCancellation();
if (leftRes->isFullyMaterialized() && rightRes->isFullyMaterialized()) {
IdTable idTable{getResultWidth(), getExecutionContext()->getAllocator()};
join(leftRes->idTable(), _leftJoinCol, rightRes->idTable(), _rightJoinCol,
&idTable);
checkCancellation();

// If only one of the two operands has a non-empty local vocabulary, share
// with that one (otherwise, throws an exception).
return {std::move(idTable), resultSortedOn(),
Result::getMergedLocalVocab(*leftRes, *rightRes)};
return computeResultForTwoMaterializedInputs(std::move(leftRes),
std::move(rightRes));
}
return lazyJoin(std::move(leftRes), _leftJoinCol, std::move(rightRes),
_rightJoinCol, requestLaziness);
Expand Down Expand Up @@ -409,6 +417,65 @@ void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b,
<< ", size = " << result->size() << "\n";
}

// _____________________________________________________________________________
Result::Generator Join::yieldOnCallbackCalled(
ad_utility::InvocableWithExactReturnType<
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const {
std::atomic_flag write = true;
std::variant<std::monostate, Result::IdTableVocabPair, std::exception_ptr>
storage;
ad_utility::JThread thread{[&write, &storage, &action, &permutation]() {
auto writeValue = [&write, &storage](auto value) noexcept {
storage = std::move(value);
write.clear();
write.notify_one();
};
auto writeValueAndWait = [&permutation, &write,
&writeValue](Result::IdTableVocabPair value) {
AD_CORRECTNESS_CHECK(write.test());
if (!permutation.empty()) {
value.idTable_.setColumnSubset(permutation);
}
writeValue(std::move(value));
// Wait until we are allowed to write again.
write.wait(false);
};
auto addValue = [&writeValueAndWait](IdTable& idTable,
LocalVocab& localVocab) {
if (idTable.size() < CHUNK_SIZE) {
return;
}
writeValueAndWait(
Result::IdTableVocabPair{std::move(idTable), std::move(localVocab)});
};
try {
auto finalValue = action(addValue);
if (!finalValue.idTable_.empty()) {
writeValueAndWait(std::move(finalValue));
}
writeValue(std::monostate{});
} catch (...) {
writeValue(std::current_exception());
}
}};
while (true) {
// Wait for read phase.
write.wait(true);
if (std::holds_alternative<std::monostate>(storage)) {
break;
}
if (std::holds_alternative<std::exception_ptr>(storage)) {
std::rethrow_exception(std::get<std::exception_ptr>(storage));
}
co_yield std::get<Result::IdTableVocabPair>(storage);
// Initiate write phase.
write.test_and_set();
write.notify_one();
}
}

// _____________________________________________________________________________
ProtoResult Join::createResult(
bool requestedLaziness,
Expand All @@ -417,64 +484,7 @@ ProtoResult Join::createResult(
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const {
if (requestedLaziness) {
return {[](auto innerAction, auto innerPermutation) -> Result::Generator {
std::atomic_flag write = true;
std::variant<std::monostate, Result::IdTableVocabPair,
std::exception_ptr>
storage;
ad_utility::JThread thread{[&write, &storage, &innerAction,
&innerPermutation]() {
auto writeValue = [&write, &storage](auto value) noexcept {
storage = std::move(value);
write.clear();
write.notify_one();
};
auto addValue = [&write, &writeValue, &innerPermutation](
IdTable& idTable, LocalVocab& localVocab) {
AD_CORRECTNESS_CHECK(write.test());
if (idTable.size() < CHUNK_SIZE) {
return;
}
Result::IdTableVocabPair pair{std::move(idTable),
std::move(localVocab)};
if (!innerPermutation.empty()) {
pair.idTable_.setColumnSubset(innerPermutation);
}
writeValue(std::move(pair));
// Wait until we are allowed to write again.
write.wait(false);
};
try {
auto finalValue = innerAction(addValue);
AD_CORRECTNESS_CHECK(write.test());
if (!finalValue.idTable_.empty()) {
if (!innerPermutation.empty()) {
finalValue.idTable_.setColumnSubset(innerPermutation);
}
writeValue(std::move(finalValue));
// Wait until we are allowed to write again.
write.wait(false);
}
writeValue(std::monostate{});
} catch (...) {
writeValue(std::current_exception());
}
}};
while (true) {
// Wait for read phase.
write.wait(true);
if (std::holds_alternative<std::monostate>(storage)) {
break;
}
if (std::holds_alternative<std::exception_ptr>(storage)) {
std::rethrow_exception(std::get<std::exception_ptr>(storage));
}
co_yield std::get<Result::IdTableVocabPair>(storage);
// Initiate write phase.
write.test_and_set();
write.notify_one();
}
}(std::move(action), std::move(permutation)),
return {yieldOnCallbackCalled(std::move(action), std::move(permutation)),
resultSortedOn()};
} else {
auto [idTable, localVocab] = action(ad_utility::noop);
Expand All @@ -485,20 +495,6 @@ ProtoResult Join::createResult(
}
}

// _____________________________________________________________________________
bool Join::couldContainUndef(const auto& blocks, const auto& tree,
ColumnIndex joinColumn) {
if constexpr (std::ranges::random_access_range<decltype(blocks)>) {
AD_CORRECTNESS_CHECK(!std::ranges::empty(blocks));
return !blocks[0].empty() && blocks[0][0].isUndefined();
} else {
auto undefStatus = tree->getVariableAndInfoByColumnIndex(joinColumn)
.second.mightContainUndef_;
return undefStatus ==
ColumnIndexAndTypeInfo::UndefStatus::PossiblyUndefined;
}
}

// ______________________________________________________________________________
ProtoResult Join::lazyJoin(std::shared_ptr<const Result> a, ColumnIndex jc1,
std::shared_ptr<const Result> b, ColumnIndex jc2,
Expand All @@ -513,43 +509,18 @@ ProtoResult Join::lazyJoin(std::shared_ptr<const Result> a, ColumnIndex jc1,
auto resultPermutation = joinColMap.permutationResult();
return createResult(
requestLaziness,
[this, a = std::move(a), jc1, b = std::move(b), jc2,
[this, a = std::move(a), b = std::move(b),
joinColMap = std::move(joinColMap)](
std::function<void(IdTable&, LocalVocab&)> yieldTable) {
ad_utility::AddCombinedRowToIdTable rowAdder{
1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
cancellationHandle_, CHUNK_SIZE, std::move(yieldTable)};
auto asSingleTableView =
[](const Result& result,
const std::vector<ColumnIndex>& permutation) {
return std::array{ad_utility::IdTableAndFirstCol{
result.idTable().asColumnSubsetView(permutation),
result.getCopyOfLocalVocab()}};
};
using Blocks = std::variant<
cppcoro::generator<ad_utility::IdTableAndFirstCol<IdTable>>,
std::array<ad_utility::IdTableAndFirstCol<IdTableView<0>>, 1>>;
auto leftRange =
a->isFullyMaterialized()
? Blocks{asSingleTableView(*a, joinColMap.permutationLeft())}
: Blocks{convertGenerator(std::move(a->idTables()),
joinColMap.permutationLeft())};
auto rightRange =
b->isFullyMaterialized()
? Blocks{asSingleTableView(*b, joinColMap.permutationRight())}
: Blocks{convertGenerator(std::move(b->idTables()),
joinColMap.permutationRight())};
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(yieldTable)};
auto leftRange = resultToRange(*a, joinColMap.permutationLeft());
auto rightRange = resultToRange(*b, joinColMap.permutationRight());
std::visit(
[this, &rowAdder, jc1, jc2](auto& leftBlocks, auto& rightBlocks) {
bool containsUndef = couldContainUndef(leftBlocks, _left, jc1) ||
couldContainUndef(rightBlocks, _right, jc2);
if (containsUndef) {
ad_utility::zipperJoinForBlocksWithPotentialUndef(
leftBlocks, rightBlocks, std::less{}, rowAdder);
} else {
ad_utility::zipperJoinForBlocksWithoutUndef(
leftBlocks, rightBlocks, std::less{}, rowAdder);
}
[&rowAdder](auto& leftBlocks, auto& rightBlocks) {
ad_utility::zipperJoinForBlocksWithPotentialUndef(
leftBlocks, rightBlocks, std::less{}, rowAdder);
},
leftRange, rightRange);
auto localVocab = std::move(rowAdder.localVocab());
Expand Down Expand Up @@ -668,7 +639,7 @@ void Join::hashJoin(const IdTable& dynA, ColumnIndex jc1, const IdTable& dynB,
// ___________________________________________________________________________
template <typename ROW_A, typename ROW_B, int TABLE_WIDTH>
void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB,
const ColumnIndex jcRowB,
ColumnIndex jcRowB,
IdTableStatic<TABLE_WIDTH>* table) {
// Add a new, empty row.
const size_t backIndex = table->size();
Expand Down Expand Up @@ -748,8 +719,8 @@ ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const {
// `AddCombinedRowToIdTable` class to work correctly.
AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0);
ad_utility::AddCombinedRowToIdTable rowAdder{
1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
cancellationHandle_, CHUNK_SIZE, std::move(yieldTable)};
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(yieldTable)};

ad_utility::Timer timer{
ad_utility::timer::Timer::InitialStatus::Started};
Expand Down Expand Up @@ -803,8 +774,8 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
std::function<void(IdTable&, LocalVocab&)> yieldTable) {
const IdTable& idTable = resultWithIdTable->idTable();
ad_utility::AddCombinedRowToIdTable rowAdder{
1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()},
cancellationHandle_, CHUNK_SIZE, std::move(yieldTable)};
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(yieldTable)};

AD_CORRECTNESS_CHECK(joinColScan == 0);
auto permutationIdTable = ad_utility::IdTableAndFirstCol{
Expand All @@ -819,13 +790,13 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
!idTable.empty() && idTable.at(0, joinColTable).isUndefined();
std::optional<std::shared_ptr<const Result>> indexScanResult =
std::nullopt;
using FirstColView = ad_utility::IdTableAndFirstCol<IdTable>;
using GenWithDetails =
cppcoro::generator<FirstColView,
CompressedRelationReader::LazyScanMetadata>;
using GenWithDetails = decltype(convertGenerator(
std::declval<Permutation::IdTableGenerator>()));
auto rightBlocks = [&scan, idTableHasUndef, &permutationIdTable,
&indexScanResult]()
-> std::variant<cppcoro::generator<FirstColView>, GenWithDetails> {
-> std::variant<decltype(convertGenerator(
std::declval<Result::Generator>())),
GenWithDetails> {
if (idTableHasUndef) {
indexScanResult =
scan->getResult(false, ComputationMode::LAZY_IF_SUPPORTED);
Expand All @@ -841,15 +812,13 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
}();

runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs());
auto doJoin = [&rowAdder, idTableHasUndef](auto& left,
auto& right) mutable {
if (idTableHasUndef) {
ad_utility::zipperJoinForBlocksWithPotentialUndef(
left, right, std::less{}, rowAdder);
} else {
ad_utility::zipperJoinForBlocksWithoutUndef(left, right,
std::less{}, rowAdder);
}
auto doJoin = [&rowAdder](auto& left, auto& right) mutable {
// Technically we could use the zipperJoinForBlocksWithoutUndef when
// checking for `idTableHasUndef`, but the implementation of the join
// algorithm consumes all undef blocks at the start and falls back to
// the other implementation when no undef blocks could be found.
ad_utility::zipperJoinForBlocksWithPotentialUndef(
left, right, std::less{}, rowAdder);
};
auto blockForIdTable = std::array{std::move(permutationIdTable)};
std::visit(
Expand All @@ -873,3 +842,21 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
},
std::move(resultPermutation));
}
// _____________________________________________________________________________
ProtoResult Join::computeResultForTwoMaterializedInputs(
std::shared_ptr<const Result> leftRes,
std::shared_ptr<const Result> rightRes) const {
IdTable idTable{getResultWidth(), allocator()};
join(leftRes->idTable(), _leftJoinCol, rightRes->idTable(), _rightJoinCol,
&idTable);
checkCancellation();

return {std::move(idTable), resultSortedOn(),
Result::getMergedLocalVocab(*leftRes, *rightRes)};
}

// _____________________________________________________________________________
ProtoResult Join::createEmptyResult() const {
return {IdTable{getResultWidth(), allocator()}, resultSortedOn(),
LocalVocab{}};
}
Loading

0 comments on commit b9b7cae

Please sign in to comment.