From d16b195bf4b3b27295ab99a4afe27c4ccacd5eb3 Mon Sep 17 00:00:00 2001 From: Satadru Pan Date: Wed, 28 Aug 2024 14:40:13 -0700 Subject: [PATCH] Ensure inputs are alive before re-using the results of a common sub-expression (#10837) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/10837 We recently encountered a bug (https://github.com/facebookincubator/velox/pull/10742) that caused incorrect results when the inputs to a shared expression were replaced with a temporary vector whose lifetime was limited to a single invocation of the expression. This resulted in a new temporary input being passed to the common sub-expression, where this vector had the same memory address but differently mapped rows, and ended up re-using results from a previous invocation. To address this issue, we have made a change to ensure that before re-using the results, we verify that the inputs have not expired. Reviewed By: bikramSingh91 Differential Revision: D61739703 fbshipit-source-id: 52d4bfc9dd34a62ff3472dcfeb9d2c9f5d84ff4c --- velox/expression/Expr.cpp | 17 ++++++++++++++--- velox/expression/Expr.h | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index a901f3217793..8be2572bd7a5 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -852,16 +852,27 @@ void Expr::evaluateSharedSubexpr( VectorPtr& result, TEval eval) { // Captures the inputs referenced by distinctFields_. - std::vector expressionInputFields; + InputForSharedResults expressionInputFields; for (auto* field : distinctFields_) { - expressionInputFields.push_back( - context.getField(field->index(context)).get()); + expressionInputFields.addInput(context.getField(field->index(context))); } // Find the cached results for the same inputs, or create an entry if one // doesn't exist. auto sharedSubexprResultsIter = sharedSubexprResults_.find(expressionInputFields); + + // If any of the input vector is freed/expired, remove the entry from the + // results cache. + if (sharedSubexprResultsIter != sharedSubexprResults_.end() && + sharedSubexprResultsIter->first.isExpired()) { + sharedSubexprResults_.erase(sharedSubexprResultsIter); + VELOX_DCHECK( + sharedSubexprResults_.find(expressionInputFields) == + sharedSubexprResults_.end()); + sharedSubexprResultsIter = sharedSubexprResults_.end(); + } + if (sharedSubexprResultsIter == sharedSubexprResults_.end()) { auto maxSharedSubexprResultsCached = context.execCtx() ->queryCtx() diff --git a/velox/expression/Expr.h b/velox/expression/Expr.h index 2880b99d48c6..b425fd392bc9 100644 --- a/velox/expression/Expr.h +++ b/velox/expression/Expr.h @@ -608,6 +608,37 @@ class Expr { std::vector inputValues_; + /// Represents a set of inputs referenced by 'distinctFields_' that are + /// captured when the 'evaluateSharedSubexpr()' method is called on a shared + /// sub-expression. The purpose of this class is to ensure that cached + /// results are re-used for the correct set of live input vectors. + class InputForSharedResults { + public: + void addInput(const std::shared_ptr& input) { + inputVectors_.push_back(input.get()); + inputWeakVectors_.push_back(input); + } + + bool operator<(const InputForSharedResults& other) const { + return inputVectors_ < other.inputVectors_; + } + + bool isExpired() const { + for (const auto& input : inputWeakVectors_) { + if (input.expired()) { + return true; + } + } + return false; + } + + private: + // Used as a key in a map that keeps track of cached results. + std::vector inputVectors_; + // Used to check if inputs have expired. + std::vector> inputWeakVectors_; + }; + struct SharedResults { // The rows for which 'sharedSubexprValues_' has a value. std::unique_ptr sharedSubexprRows_ = nullptr; @@ -617,7 +648,7 @@ class Expr { // Maps the inputs referenced by distinctFields_ captuered when // evaluateSharedSubexpr() is called to the cached shared results. - std::map, SharedResults> sharedSubexprResults_; + std::map sharedSubexprResults_; // Pointers to the last base vector of cachable dictionary input. Used to // check if the current input's base vector is the same as the last. If it's