Skip to content

Commit

Permalink
Implement lazy join (#1524)
Browse files Browse the repository at this point in the history
This PR lazily computes single-column JOINs. Both inputs to the JOIN still have to be sorted, because the only supported algorithm so far is a sort-merge join, bot star joins between (large) Index Scans can heavily benefit from this optimization.
  • Loading branch information
RobinTF authored Nov 25, 2024
1 parent afd35ec commit a9b9862
Show file tree
Hide file tree
Showing 11 changed files with 949 additions and 210 deletions.
80 changes: 56 additions & 24 deletions src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class AddCombinedRowToIdTable {
size_t numJoinColumns_;
std::optional<std::array<IdTableView<0>, 2>> inputLeftAndRight_;
IdTable resultTable_;
LocalVocab mergedVocab_{};
std::array<const LocalVocab*, 2> currentVocabs_{nullptr, nullptr};

// This struct stores the information, which row indices from the input are
// combined into a given row index in the output, i.e. "To obtain the
Expand Down Expand Up @@ -62,7 +64,7 @@ class AddCombinedRowToIdTable {
// This callback is called with the result as an argument each time `flush()`
// is called. It can be used to consume parts of the result early, before the
// complete operation has finished.
using BlockwiseCallback = std::function<void(IdTable&)>;
using BlockwiseCallback = std::function<void(IdTable&, LocalVocab&)>;
[[no_unique_address]] BlockwiseCallback blockwiseCallback_{ad_utility::noop};

using CancellationHandle = ad_utility::SharedCancellationHandle;
Expand Down Expand Up @@ -124,42 +126,60 @@ class AddCombinedRowToIdTable {
}
}

// Unwrap type `T` to get an `IdTableView<0>`, even if it's not an
// `IdTableView<0>`. Identity for `IdTableView<0>`.
template <typename T>
static IdTableView<0> toView(const T& table) {
if constexpr (requires { table.template asStaticView<0>(); }) {
return table.template asStaticView<0>();
} else {
return table;
}
}

// Merge the local vocab contained in `T` with the `mergedVocab_` and set the
// passed pointer reference to that vocab.
template <typename T>
void mergeVocab(const T& table, const LocalVocab*& currentVocab) {
AD_CORRECTNESS_CHECK(currentVocab == nullptr);
if constexpr (requires { table.getLocalVocab(); }) {
currentVocab = &table.getLocalVocab();
mergedVocab_.mergeWith(std::span{&table.getLocalVocab(), 1});
}
}

// Flush remaining pending entries before changing the input.
void flushBeforeInputChange() {
// Clear to avoid unnecessary merge.
currentVocabs_ = {nullptr, nullptr};
if (nextIndex_ != 0) {
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
flush();
} else {
// Clear vocab when no rows were written.
mergedVocab_ = LocalVocab{};
}
}

// Set or reset the input. All following calls to `addRow` then refer to
// indices in the new input. Before resetting, `flush()` is called, so all the
// rows from the previous inputs get materialized before deleting the old
// inputs. The arguments to `inputLeft` and `inputRight` can either be
// `IdTable` or `IdTableView<0>`, or any other type that has a
// `asStaticView<0>` method that returns an `IdTableView<0>`.
void setInput(const auto& inputLeft, const auto& inputRight) {
auto toView = []<typename T>(const T& table) {
if constexpr (requires { table.template asStaticView<0>(); }) {
return table.template asStaticView<0>();
} else {
return table;
}
};
if (nextIndex_ != 0) {
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
flush();
}
flushBeforeInputChange();
mergeVocab(inputLeft, currentVocabs_.at(0));
mergeVocab(inputRight, currentVocabs_.at(1));
inputLeftAndRight_ = std::array{toView(inputLeft), toView(inputRight)};
checkNumColumns();
}

// Only set the left input. After this it is only allowed to call
// `addOptionalRow` and not `addRow` until `setInput` has been called again.
void setOnlyLeftInputForOptionalJoin(const auto& inputLeft) {
auto toView = []<typename T>(const T& table) {
if constexpr (requires { table.template asStaticView<0>(); }) {
return table.template asStaticView<0>();
} else {
return table;
}
};
if (nextIndex_ != 0) {
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
flush();
}
flushBeforeInputChange();
mergeVocab(inputLeft, currentVocabs_.at(0));
// The right input will be empty, but with the correct number of columns.
inputLeftAndRight_ = std::array{
toView(inputLeft),
Expand Down Expand Up @@ -188,6 +208,8 @@ class AddCombinedRowToIdTable {
return std::move(resultTable_);
}

LocalVocab& localVocab() { return mergedVocab_; }

// Disable copying and moving, it is currently not needed and makes it harder
// to reason about
AddCombinedRowToIdTable(const AddCombinedRowToIdTable&) = delete;
Expand Down Expand Up @@ -320,7 +342,17 @@ class AddCombinedRowToIdTable {
indexBuffer_.clear();
optionalIndexBuffer_.clear();
nextIndex_ = 0;
std::invoke(blockwiseCallback_, result);
std::invoke(blockwiseCallback_, result, mergedVocab_);
// The current `IdTable`s might still be active, so we have to merge the
// local vocabs again if all other sets were moved-out.
if (resultTable_.empty()) {
// Make sure to reset `mergedVocab_` so it is in a valid state again.
mergedVocab_ = LocalVocab{};
// Only merge non-null vocabs.
auto range = currentVocabs_ | std::views::filter(toBool) |
std::views::transform(dereference);
mergedVocab_.mergeWith(std::ranges::ref_view{range});
}
}
const IdTableView<0>& inputLeft() const {
return inputLeftAndRight_.value()[0];
Expand Down
Loading

0 comments on commit a9b9862

Please sign in to comment.