Skip to content

Commit

Permalink
Ensure inputs are alive before re-using the results of a common sub-e…
Browse files Browse the repository at this point in the history
…xpression (facebookincubator#10837)

Summary:
Pull Request resolved: facebookincubator#10837

We recently encountered a bug (facebookincubator#10742) that caused incorrect results
when the inputs to a shared expression were replaced with a temporary
vector whose lifetime was limited to a single invocation of the
expression. This resulted in a new temporary input being passed to the
common sub-expression, where this vector had the same memory address
but differently mapped rows, and ended up re-using results from a
previous invocation. To address this issue, we have made a change to
ensure that before re-using the results, we verify that the inputs
have not expired.

Reviewed By: bikramSingh91

Differential Revision: D61739703

fbshipit-source-id: 52d4bfc9dd34a62ff3472dcfeb9d2c9f5d84ff4c
  • Loading branch information
pansatadru authored and facebook-github-bot committed Aug 28, 2024
1 parent 47e8958 commit d16b195
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
17 changes: 14 additions & 3 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,16 +852,27 @@ void Expr::evaluateSharedSubexpr(
VectorPtr& result,
TEval eval) {
// Captures the inputs referenced by distinctFields_.
std::vector<const BaseVector*> expressionInputFields;
InputForSharedResults expressionInputFields;
for (auto* field : distinctFields_) {
expressionInputFields.push_back(
context.getField(field->index(context)).get());
expressionInputFields.addInput(context.getField(field->index(context)));
}

// Find the cached results for the same inputs, or create an entry if one
// doesn't exist.
auto sharedSubexprResultsIter =
sharedSubexprResults_.find(expressionInputFields);

// If any of the input vector is freed/expired, remove the entry from the
// results cache.
if (sharedSubexprResultsIter != sharedSubexprResults_.end() &&
sharedSubexprResultsIter->first.isExpired()) {
sharedSubexprResults_.erase(sharedSubexprResultsIter);
VELOX_DCHECK(
sharedSubexprResults_.find(expressionInputFields) ==
sharedSubexprResults_.end());
sharedSubexprResultsIter = sharedSubexprResults_.end();
}

if (sharedSubexprResultsIter == sharedSubexprResults_.end()) {
auto maxSharedSubexprResultsCached = context.execCtx()
->queryCtx()
Expand Down
33 changes: 32 additions & 1 deletion velox/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,37 @@ class Expr {

std::vector<VectorPtr> inputValues_;

/// Represents a set of inputs referenced by 'distinctFields_' that are
/// captured when the 'evaluateSharedSubexpr()' method is called on a shared
/// sub-expression. The purpose of this class is to ensure that cached
/// results are re-used for the correct set of live input vectors.
class InputForSharedResults {
public:
void addInput(const std::shared_ptr<BaseVector>& input) {
inputVectors_.push_back(input.get());
inputWeakVectors_.push_back(input);
}

bool operator<(const InputForSharedResults& other) const {
return inputVectors_ < other.inputVectors_;
}

bool isExpired() const {
for (const auto& input : inputWeakVectors_) {
if (input.expired()) {
return true;
}
}
return false;
}

private:
// Used as a key in a map that keeps track of cached results.
std::vector<const BaseVector*> inputVectors_;
// Used to check if inputs have expired.
std::vector<std::weak_ptr<BaseVector>> inputWeakVectors_;
};

struct SharedResults {
// The rows for which 'sharedSubexprValues_' has a value.
std::unique_ptr<SelectivityVector> sharedSubexprRows_ = nullptr;
Expand All @@ -617,7 +648,7 @@ class Expr {

// Maps the inputs referenced by distinctFields_ captuered when
// evaluateSharedSubexpr() is called to the cached shared results.
std::map<std::vector<const BaseVector*>, SharedResults> sharedSubexprResults_;
std::map<InputForSharedResults, SharedResults> sharedSubexprResults_;

// Pointers to the last base vector of cachable dictionary input. Used to
// check if the current input's base vector is the same as the last. If it's
Expand Down

0 comments on commit d16b195

Please sign in to comment.