From 1df2488b53c0d6437fc4d090ccf575d8cb4e4757 Mon Sep 17 00:00:00 2001 From: garenshi Date: Wed, 20 Nov 2024 19:49:17 +0800 Subject: [PATCH 1/8] [Nereids](fix) fix lambda expression slot can not bind --- .../rowset/segment_v2/segment_iterator.cpp | 9 +- .../exprs/lambda_function/lambda_function.h | 9 +- .../lambda_function/varray_map_function.cpp | 219 ++++++++++++++---- be/src/vec/exprs/vcolumn_ref.h | 10 +- be/src/vec/exprs/vexpr.h | 3 + be/src/vec/exprs/vlambda_function_call_expr.h | 1 + .../apache/doris/nereids/analyzer/Scope.java | 11 +- .../rules/analysis/BindExpression.java | 2 +- .../rules/analysis/ExpressionAnalyzer.java | 5 +- .../rules/analysis/SubExprAnalyzer.java | 11 +- .../test_array_map_function.groovy | 3 +- .../datetime_functions/test_convert_tz.groovy | 3 +- 12 files changed, 214 insertions(+), 72 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 4ee73547c117e9..d7a331f53f6bea 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2175,11 +2175,10 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { if (block->rows() == 0) { vectorized::MutableColumnPtr col0 = std::move(*block->get_by_position(0).column).mutate(); - auto res_column = vectorized::ColumnString::create(); - res_column->insert_data("", 0); - auto col_const = vectorized::ColumnConst::create(std::move(res_column), - selected_size); - block->replace_by_position(0, std::move(col_const)); + auto tmp_indicator_col = + block->get_by_position(0).type->create_column_const_with_default_value( + selected_size); + block->replace_by_position(0, std::move(tmp_indicator_col)); _output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size, block); block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0); diff --git a/be/src/vec/exprs/lambda_function/lambda_function.h b/be/src/vec/exprs/lambda_function/lambda_function.h index 184b4c2cc38abf..07302954688271 100644 --- a/be/src/vec/exprs/lambda_function/lambda_function.h +++ b/be/src/vec/exprs/lambda_function/lambda_function.h @@ -17,7 +17,7 @@ #pragma once -#include +#include #include "common/status.h" #include "vec/core/block.h" @@ -31,9 +31,16 @@ class LambdaFunction { virtual std::string get_name() const = 0; + virtual doris::Status prepare(RuntimeState* state) { + batch_size = state->batch_size(); + return Status::OK(); + } + virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id, const DataTypePtr& result_type, const VExprSPtrs& children) = 0; + + int batch_size; }; using LambdaFunctionPtr = std::shared_ptr; diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index f80cffa166eac4..f28485c7df3981 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -17,7 +17,11 @@ #include #include -#include +#include +#include +#include +#include +#include #include "common/status.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -42,6 +46,17 @@ namespace doris::vectorized { class VExprContext; +struct LambdaArgs { + std::vector output_slot_ref_indexs; + int64_t current_row_idx = 0; + int64_t current_offset_in_array = 0; + size_t array_start = 0; + int64_t cur_size = 0; + const ColumnArray::Offsets64* offsets_ptr = nullptr; + int current_repeat_times = 0; + bool eos = false; +}; + class ArrayMapFunction : public LambdaFunction { ENABLE_FACTORY_CREATOR(ArrayMapFunction); @@ -57,8 +72,32 @@ class ArrayMapFunction : public LambdaFunction { doris::Status execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id, const DataTypePtr& result_type, const VExprSPtrs& children) override { - ///* array_map(lambda,arg1,arg2,.....) */// + LambdaArgs args; + // collect used slot ref in lambda function body + _collect_slot_ref_column_id(children[0], args); + + int gap = 0; + if (!args.output_slot_ref_indexs.empty()) { + auto max_id = std::max_element(args.output_slot_ref_indexs.begin(), args.output_slot_ref_indexs.end()); + gap = *max_id + 1; + _set_column_ref_column_id(children[0], gap); + } + + std::vector names(gap); + DataTypes data_types(gap); + for (int i = 0; i < gap; ++i) { + if (_contains_column_id(args, i)) { + names[i] = block->get_by_position(i).name; + data_types[i] = block->get_by_position(i).type; + } else { + // padding some mock data + names[i] = "temp"; + data_types[i] = std::make_shared(); + } + } + + ///* array_map(lambda,arg1,arg2,.....) */// //1. child[1:end]->execute(src_block) doris::vectorized::ColumnNumbers arguments(children.size() - 1); for (int i = 1; i < children.size(); ++i) { @@ -70,21 +109,20 @@ class ArrayMapFunction : public LambdaFunction { // used for save column array outside null map auto outside_null_map = ColumnUInt8::create(block->get_by_position(arguments[0]) - .column->convert_to_full_column_if_const() - ->size(), + .column->convert_to_full_column_if_const() + ->size(), 0); // offset column MutableColumnPtr array_column_offset; size_t nested_array_column_rows = 0; ColumnPtr first_array_offsets = nullptr; - //2. get the result column from executed expr, and the needed is nested column of array - Block lambda_block; + std::vector lambda_datas(arguments.size()); + for (int i = 0; i < arguments.size(); ++i) { const auto& array_column_type_name = block->get_by_position(arguments[i]); auto column_array = array_column_type_name.column->convert_to_full_column_if_const(); auto type_array = array_column_type_name.type; - if (type_array->is_nullable()) { // get the nullmap of nullable column const auto& column_array_nullmap = @@ -92,11 +130,11 @@ class ArrayMapFunction : public LambdaFunction { // get the array column from nullable column column_array = assert_cast(column_array.get()) - ->get_nested_column_ptr(); + ->get_nested_column_ptr(); // get the nested type from nullable type type_array = assert_cast(array_column_type_name.type.get()) - ->get_nested_type(); + ->get_nested_type(); // need to union nullmap from all columns VectorizedUtils::update_null_map(outside_null_map->get_data(), @@ -113,75 +151,172 @@ class ArrayMapFunction : public LambdaFunction { const auto& off_data = assert_cast( col_array.get_offsets_column()); array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size()); + args.offsets_ptr = &col_array.get_offsets(); } else { // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2; // c_array1: [0,1,2,3,4,5,6,7,8,9] const auto& array_offsets = assert_cast(*first_array_offsets) - .get_data(); + .get_data(); if (nested_array_column_rows != col_array.get_data_ptr()->size() || (!array_offsets.empty() && memcmp(array_offsets.data(), col_array.get_offsets().data(), sizeof(array_offsets[0]) * array_offsets.size()) != 0)) { return Status::InvalidArgument( - "in array map function, the input column size " - "are " - "not equal completely, nested column data rows 1st size is {}, {}th " - "size is {}.", - nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); + "in array map function, the input column size " + "are " + "not equal completely, nested column data rows 1st size is {}, {}th " + "size is {}.", + nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); } } - - // insert the data column to the new block - ColumnWithTypeAndName data_column {col_array.get_data_ptr(), col_type.get_nested_type(), - "R" + array_column_type_name.name}; - lambda_block.insert(std::move(data_column)); + lambda_datas[i] = col_array.get_data_ptr(); + names.push_back("R" + array_column_type_name.name); + data_types.push_back(col_type.get_nested_type()); } - //3. child[0]->execute(new_block) - RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, result_column_id)); + ColumnPtr result_col = nullptr; + DataTypePtr res_type; + std::string res_name; + + //process first row + args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1]; + args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - args.array_start; + + while (args.current_row_idx < block->rows()) { + Block lambda_block; + for (int i = 0; i < names.size(); i++) { + ColumnWithTypeAndName data_column {data_types[i], names[i]}; + lambda_block.insert(std::move(data_column)); + } + + MutableBlock m_lambda_block(&lambda_block); + MutableColumns& columns = m_lambda_block.mutable_columns(); + while (columns[gap]->size() < batch_size) { + long max_step = batch_size - columns[gap]->size(); + long current_step = std::min(max_step, (long)(args.cur_size - args.current_offset_in_array)); + size_t pos = args.array_start + args.current_offset_in_array; + for (int i = 0; i < arguments.size(); ++i) { + columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step); + } + args.current_offset_in_array += current_step; + args.current_repeat_times += current_step; + if (args.current_offset_in_array >= args.cur_size) { + args.eos = true; + } else { + _extend_data(columns, block, args, gap); + } + if (args.eos) { + _extend_data(columns, block, args, gap); + args.current_row_idx++; + args.current_offset_in_array = 0; + if (args.current_row_idx >= block->rows()) { + break; + } + args.eos = false; + args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1]; + args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - args.array_start; + } + } + + lambda_block.set_columns(std::move(columns)); - auto res_col = lambda_block.get_by_position(*result_column_id) - .column->convert_to_full_column_if_const(); - auto res_type = lambda_block.get_by_position(*result_column_id).type; - auto res_name = lambda_block.get_by_position(*result_column_id).name; + //3. child[0]->execute(new_block) + RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, result_column_id)); + + auto res_col = lambda_block.get_by_position(*result_column_id) + .column->convert_to_full_column_if_const(); + res_type = lambda_block.get_by_position(*result_column_id).type; + res_name = lambda_block.get_by_position(*result_column_id).name; + if (!result_col) { + result_col = std::move(res_col); + } else { + MutableColumnPtr column = (*std::move(result_col)).mutate(); + column->insert_range_from(*res_col, 0, res_col->size()); + } + } //4. get the result column after execution, reassemble it into a new array column, and return. ColumnWithTypeAndName result_arr; if (result_type->is_nullable()) { if (res_type->is_nullable()) { result_arr = {ColumnNullable::create( - ColumnArray::create(res_col, std::move(array_column_offset)), - std::move(outside_null_map)), - result_type, res_name}; + ColumnArray::create(result_col, std::move(array_column_offset)), + std::move(outside_null_map)), + result_type, res_name}; } else { // deal with eg: select array_map(x -> x is null, [null, 1, 2]); // need to create the nested column null map for column array - auto nested_null_map = ColumnUInt8::create(res_col->size(), 0); + auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); result_arr = { - ColumnNullable::create( - ColumnArray::create( - ColumnNullable::create(res_col, std::move(nested_null_map)), - std::move(array_column_offset)), - std::move(outside_null_map)), - result_type, res_name}; + ColumnNullable::create( + ColumnArray::create( + ColumnNullable::create(result_col, std::move(nested_null_map)), + std::move(array_column_offset)), + std::move(outside_null_map)), + result_type, res_name}; } } else { if (res_type->is_nullable()) { - result_arr = {ColumnArray::create(res_col, std::move(array_column_offset)), - result_type, res_name}; + result_arr = {ColumnArray::create(result_col, std::move(array_column_offset)), + result_type, res_name}; } else { - auto nested_null_map = ColumnUInt8::create(res_col->size(), 0); + auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); result_arr = {ColumnArray::create( - ColumnNullable::create(res_col, std::move(nested_null_map)), - std::move(array_column_offset)), - result_type, res_name}; + ColumnNullable::create(result_col, std::move(nested_null_map)), + std::move(array_column_offset)), + result_type, res_name}; } } block->insert(std::move(result_arr)); *result_column_id = block->columns() - 1; + return Status::OK(); } + +private: + + bool _contains_column_id(LambdaArgs& args, int id) { + const auto it = std::find(args.output_slot_ref_indexs.begin(), args.output_slot_ref_indexs.end(), id); + return it != args.output_slot_ref_indexs.end(); + } + + void _set_column_ref_column_id(VExprSPtr expr, int gap) { + for (const auto& child : expr->children()) { + if (child->is_column_ref()) { + auto* ref = static_cast(child.get()); + ref->set_gap(gap); + } else { + _set_column_ref_column_id(child, gap); + } + } + } + + void _collect_slot_ref_column_id(VExprSPtr expr, LambdaArgs& args) { + for (const auto& child : expr->children()) { + if (child->is_slot_ref()) { + const auto* ref = static_cast(child.get()); + args.output_slot_ref_indexs.push_back(ref->column_id()); + } else { + _collect_slot_ref_column_id(child, args); + } + } + } + + void _extend_data(std::vector& columns, Block* block, LambdaArgs& args,int size) { + if (!args.current_repeat_times || !size) { + return; + } + for (int i = 0; i < size; i++) { + if (_contains_column_id(args, i)) { + auto src_column = block->get_by_position(i).column->convert_to_full_column_if_const(); + columns[i]->insert_many_from(*src_column, args.current_row_idx, args.current_repeat_times); + } else { + columns[i]->insert_many_defaults(args.current_repeat_times); + } + } + args.current_repeat_times = 0; + } }; void register_function_array_map(doris::vectorized::LambdaFunctionFactory& factory) { diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h index a763797880e361..6904a9fc87c0db 100644 --- a/be/src/vec/exprs/vcolumn_ref.h +++ b/be/src/vec/exprs/vcolumn_ref.h @@ -20,6 +20,7 @@ #include "runtime/runtime_state.h" #include "vec/exprs/vexpr.h" #include "vec/functions/function.h" +#include namespace doris { namespace vectorized { @@ -57,7 +58,7 @@ class VColumnRef final : public VExpr { Status execute(VExprContext* context, Block* block, int* result_column_id) override { DCHECK(_open_finished || _getting_const_col); - *result_column_id = _column_id; + *result_column_id = _column_id + _gap; return Status::OK(); } @@ -67,6 +68,12 @@ class VColumnRef final : public VExpr { const std::string& expr_name() const override { return _column_name; } + void set_gap(int gap) { + if (_gap == 0) { + _gap = gap; + } + } + std::string debug_string() const override { std::stringstream out; out << "VColumnRef(slot_id: " << _column_id << ",column_name: " << _column_name @@ -76,6 +83,7 @@ class VColumnRef final : public VExpr { private: int _column_id; + std::atomic _gap = 0; std::string _column_name; }; } // namespace vectorized diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 3456fb431a48cb..2580cf7ddc88f9 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -148,6 +148,9 @@ class VExpr { TypeDescriptor type() { return _type; } bool is_slot_ref() const { return _node_type == TExprNodeType::SLOT_REF; } + + bool is_column_ref() const { return _node_type == TExprNodeType::COLUMN_REF; } + virtual bool is_literal() const { return false; } TExprNodeType::type node_type() const { return _node_type; } diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h b/be/src/vec/exprs/vlambda_function_call_expr.h index 44d22b1f9ebbea..7b0ea1dcb5a75d 100644 --- a/be/src/vec/exprs/vlambda_function_call_expr.h +++ b/be/src/vec/exprs/vlambda_function_call_expr.h @@ -50,6 +50,7 @@ class VLambdaFunctionCallExpr : public VExpr { return Status::InternalError("Lambda Function {} is not implemented.", _fn.name.function_name); } + RETURN_IF_ERROR(_lambda_function->prepare(state)); _prepare_finished = true; return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java index dbcbea7c104b5a..7c77c680ff287b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.analyzer; import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.expressions.SubqueryExpr; import org.apache.doris.nereids.util.Utils; import com.google.common.base.Suppliers; @@ -62,20 +61,18 @@ public class Scope { private final Optional outerScope; private final List slots; - private final Optional ownerSubquery; private final Set correlatedSlots; private final boolean buildNameToSlot; private final Supplier> nameToSlot; public Scope(List slots) { - this(Optional.empty(), slots, Optional.empty()); + this(Optional.empty(), slots); } /** Scope */ - public Scope(Optional outerScope, List slots, Optional subqueryExpr) { + public Scope(Optional outerScope, List slots) { this.outerScope = Objects.requireNonNull(outerScope, "outerScope can not be null"); this.slots = Utils.fastToImmutableList(Objects.requireNonNull(slots, "slots can not be null")); - this.ownerSubquery = Objects.requireNonNull(subqueryExpr, "subqueryExpr can not be null"); this.correlatedSlots = Sets.newLinkedHashSet(); this.buildNameToSlot = slots.size() > 500; this.nameToSlot = buildNameToSlot ? Suppliers.memoize(this::buildNameToSlot) : null; @@ -89,10 +86,6 @@ public Optional getOuterScope() { return outerScope; } - public Optional getSubquery() { - return ownerSubquery; - } - public Set getCorrelatedSlots() { return correlatedSlots; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java index cde659b91021ad..1e481542baec12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java @@ -1229,7 +1229,7 @@ private E checkBoundExceptLambda(E expression, Plan plan) private Scope toScope(CascadesContext cascadesContext, List slots) { Optional outerScope = cascadesContext.getOuterScope(); if (outerScope.isPresent()) { - return new Scope(outerScope, slots, outerScope.get().getSubquery()); + return new Scope(outerScope, slots); } else { return new Scope(slots); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java index adc68ac6ecac1b..6abbbda447c8ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java @@ -833,8 +833,9 @@ private UnboundFunction bindHighOrderFunction(UnboundFunction unboundFunction, E .map(ArrayItemReference::toSlot) .collect(ImmutableList.toImmutableList()); - ExpressionAnalyzer lambdaAnalyzer = new ExpressionAnalyzer(currentPlan, new Scope(boundedSlots), - context == null ? null : context.cascadesContext, true, false) { + ExpressionAnalyzer lambdaAnalyzer = new ExpressionAnalyzer(currentPlan, new Scope(Optional.of(getScope()), + boundedSlots), context == null ? null : context.cascadesContext, + true, true) { @Override protected void couldNotFoundColumn(UnboundSlot unboundSlot, String tableName) { throw new AnalysisException("Unknown lambda slot '" diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java index 7b0ed45708251d..9a70ce24afb8ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java @@ -53,7 +53,6 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; /** @@ -205,19 +204,15 @@ private AnalyzedResult analyzeSubquery(SubqueryExpr expr) { } CascadesContext subqueryContext = CascadesContext.newContextWithCteContext( cascadesContext, expr.getQueryPlan(), cascadesContext.getCteContext()); - Scope subqueryScope = genScopeWithSubquery(expr); + // don't use `getScope()` because we only need `getScope().getOuterScope()` and `getScope().getSlots()` + // otherwise unexpected errors may occur + Scope subqueryScope = new Scope(getScope().getOuterScope(), getScope().getSlots()); subqueryContext.setOuterScope(subqueryScope); subqueryContext.newAnalyzer().analyze(); return new AnalyzedResult((LogicalPlan) subqueryContext.getRewritePlan(), subqueryScope.getCorrelatedSlots()); } - private Scope genScopeWithSubquery(SubqueryExpr expr) { - return new Scope(getScope().getOuterScope(), - getScope().getSlots(), - Optional.ofNullable(expr)); - } - public Scope getScope() { return scope; } diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy index 311b188e433550..1fd0c8ec147201 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy @@ -90,8 +90,7 @@ suite("test_array_map_function") { test { sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from ${tableName} where id > 10 order by id""" check{result, exception, startTime, endTime -> - assertTrue(exception != null) - logger.info(exception.message) + assertTrue(exception == null) } } diff --git a/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy b/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy index 2959c7f136b8e5..e382fe9ae70b84 100644 --- a/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy +++ b/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_convert_tz") { + sql "drop table if exists `cvt_tz`" sql """ CREATE TABLE `cvt_tz` ( `rowid` int NULL, @@ -34,4 +35,4 @@ suite("test_convert_tz") { order_qt_sql1 """ select convert_tz(dt, '+00:00', IF(property_value IS NULL, '+00:00', property_value)) from cvt_tz """ -} \ No newline at end of file +} From 4bdeaeb14bbbf915b38c90feda43850837648539 Mon Sep 17 00:00:00 2001 From: garenshi Date: Wed, 4 Dec 2024 19:15:02 +0800 Subject: [PATCH 2/8] 1 --- .../array_functions/test_array_map_function.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy index 1fd0c8ec147201..17de93942bfc61 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy @@ -75,8 +75,7 @@ suite("test_array_map_function") { test { sql"""select c_array1,array_max(array_map(x->countequal(c_array1,x),c_array1)) from array_test2;""" check{result, exception, startTime, endTime -> - assertTrue(exception != null) - logger.info(exception.message) + assertTrue(exception == null) } } @@ -90,7 +89,8 @@ suite("test_array_map_function") { test { sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from ${tableName} where id > 10 order by id""" check{result, exception, startTime, endTime -> - assertTrue(exception == null) + assertTrue(exception != null) + logger.info(exception.message) } } From 288efc04308244a4bb6654f3f9c87a14c386ee7d Mon Sep 17 00:00:00 2001 From: garenshi Date: Thu, 5 Dec 2024 11:43:30 +0800 Subject: [PATCH 3/8] 1 --- .../test_array_map_function_with_column.out | 41 +++++++++++ ...test_array_map_function_with_column.groovy | 70 +++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 regression-test/data/query_p0/sql_functions/array_functions/test_array_map_function_with_column.out create mode 100644 regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function_with_column.groovy diff --git a/regression-test/data/query_p0/sql_functions/array_functions/test_array_map_function_with_column.out b/regression-test/data/query_p0/sql_functions/array_functions/test_array_map_function_with_column.out new file mode 100644 index 00000000000000..24d7b53e2b34f1 --- /dev/null +++ b/regression-test/data/query_p0/sql_functions/array_functions/test_array_map_function_with_column.out @@ -0,0 +1,41 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +\N \N \N \N \N +1 2 [1, 2, 3, 4] [null, 2, 3] [1, 1, 1, 1] +2 3 [6, 7, null, 9] [4, null, 6] [1, 1, null, 1] +3 4 \N [4, 5, 6] \N + +-- !select_2 -- +\N \N \N \N \N +1 2 [1, 2, 3, 4] [null, 2, 3] [1, 1, 1, 1] +2 3 [6, 7, null, 9] [4, null, 6] [1, 1, null, 1] +3 4 \N [4, 5, 6] \N + +-- !select_3 -- +\N \N \N \N \N +1 2 [1, 2, 3, 4] [null, 2, 3] [1, 1, 1, 1] +2 3 [6, 7, null, 9] [4, null, 6] [1, 1, null, 1] +3 4 \N [4, 5, 6] \N + +-- !select_4 -- +\N \N \N \N \N +1 2 [1, 2, 3, 4] [null, 2, 3] [1, 1, 1, 1] +2 3 [6, 7, null, 9] [4, null, 6] [1, 1, null, 1] +3 4 \N [4, 5, 6] \N + +-- !select_5 -- +\N \N \N \N \N +1 2 [1, 2, 3, 4] [null, 2, 3] [1, 1, 1, 1] +2 3 [6, 7, null, 9] [4, null, 6] [1, 1, null, 1] +3 4 \N [4, 5, 6] \N + +-- !select_6 -- +\N \N \N \N \N +4 5 [6, 7, null, 9] [4, 5, 6, 7] [0, 0, null, 0] +5 6 [10, 11, 12, 13] [8, 9, null, 11] [0, 0, null, 0] +6 7 \N \N \N + +-- !select_7 -- +4 5 [6, 7, null, 9] [4, 5, 6, 7] [0, 0, null, 0] +5 6 [10, 11, 12, 13] [8, 9, null, 11] [0, 0, null, 0] + diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function_with_column.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function_with_column.groovy new file mode 100644 index 00000000000000..024e4b57a34572 --- /dev/null +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function_with_column.groovy @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_array_map_function_with_column") { + + def tableName = "array_test_with_column" + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL COMMENT "", + `k2` int(11) NULL COMMENT "", + `c_array1` ARRAY NULL COMMENT "", + `c_array2` ARRAY NULL COMMENT "" + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + + sql """INSERT INTO ${tableName} values + (1, 2, [1,2,3,4], [null,2,3]), + (2, 3, [6,7,null,9], [4,null,6]), + (3, 4, NULL, [4, 5, 6]), + (NULL, NULL, NULL, NULL); + """ + + qt_select_1 "select *,array_map(x->x+k1+k2 > k1*k2,c_array1) from ${tableName} order by k1;" + + sql "set batch_size = 1;" + qt_select_2 "select *,array_map(x->x+k1+k2 > k1*k2,c_array1) from ${tableName} order by k1;" + + sql "set batch_size = 4;" + qt_select_3 "select *,array_map(x->x+k1+k2 > k1*k2,c_array1) from ${tableName} order by k1;" + + sql "set batch_size = 6;" + qt_select_4 "select *,array_map(x->x+k1+k2 > k1*k2,c_array1) from ${tableName} order by k1;" + + sql "set batch_size = 8;" + qt_select_5 "select *,array_map(x->x+k1+k2 > k1*k2,c_array1) from ${tableName} order by k1;" + + sql "truncate table ${tableName};" + + sql """INSERT INTO ${tableName} values + (4, 5, [6,7,null,9], [4,5,6,7]), + (5, 6, [10,11,12,13], [8,9,null,11]), + (6, 7, NULL, NULL), + (NULL, NULL, NULL, NULL); + """ + qt_select_6 "select *,array_map((x,y)->x+k1+k2 > y+k1*k2,c_array1,c_array2) from ${tableName} order by k1;" + + qt_select_7 "select *,array_map((x,y)->x+k1+k2 > y+k1*k2,c_array1,c_array2) from ${tableName} where array_count((x,y) -> k1*x>y+k2, c_array1, c_array2) > 1 order by k1;" + + sql "DROP TABLE IF EXISTS ${tableName}" +} From 05e29dcb25eeae2e283d5cb598fb64b3bf397e31 Mon Sep 17 00:00:00 2001 From: garenshi Date: Thu, 5 Dec 2024 14:41:53 +0800 Subject: [PATCH 4/8] 1 --- .../lambda_function/varray_map_function.cpp | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index f28485c7df3981..3e4e2273fc5c22 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include "common/status.h" @@ -42,19 +41,27 @@ #include "vec/utils/util.hpp" namespace doris::vectorized { -#include "common/compile_check_begin.h" class VExprContext; +// extend a block with all required parameters struct LambdaArgs { + // the lambda function need the column ids of all the slots std::vector output_slot_ref_indexs; + // which line is extended to the original block int64_t current_row_idx = 0; + // when a block is filled, the array may be truncated, recording where it was truncated int64_t current_offset_in_array = 0; + // the beginning position of the array size_t array_start = 0; + // the size of the array int64_t cur_size = 0; + // offset of column array const ColumnArray::Offsets64* offsets_ptr = nullptr; + // expend data of repeat times int current_repeat_times = 0; - bool eos = false; + // whether the current row of the original block has been extended + bool current_row_eos = false; }; class ArrayMapFunction : public LambdaFunction { @@ -190,8 +197,7 @@ class ArrayMapFunction : public LambdaFunction { lambda_block.insert(std::move(data_column)); } - MutableBlock m_lambda_block(&lambda_block); - MutableColumns& columns = m_lambda_block.mutable_columns(); + MutableColumns columns = lambda_block.mutate_columns(); while (columns[gap]->size() < batch_size) { long max_step = batch_size - columns[gap]->size(); long current_step = std::min(max_step, (long)(args.cur_size - args.current_offset_in_array)); @@ -202,18 +208,16 @@ class ArrayMapFunction : public LambdaFunction { args.current_offset_in_array += current_step; args.current_repeat_times += current_step; if (args.current_offset_in_array >= args.cur_size) { - args.eos = true; - } else { - _extend_data(columns, block, args, gap); + args.current_row_eos = true; } - if (args.eos) { - _extend_data(columns, block, args, gap); + _extend_data(columns, block, args, gap); + if (args.current_row_eos) { args.current_row_idx++; args.current_offset_in_array = 0; if (args.current_row_idx >= block->rows()) { break; } - args.eos = false; + args.current_row_eos = false; args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1]; args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - args.array_start; } From 30053387e42cee7232ede5dfbf64cb33a531a016 Mon Sep 17 00:00:00 2001 From: garenshi Date: Thu, 5 Dec 2024 15:11:08 +0800 Subject: [PATCH 5/8] 1 --- .../exprs/lambda_function/varray_map_function.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index 3e4e2273fc5c22..f3d22040c0e97d 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -41,7 +41,7 @@ #include "vec/utils/util.hpp" namespace doris::vectorized { - +#include "common/compile_check_begin.h" class VExprContext; // extend a block with all required parameters @@ -193,7 +193,12 @@ class ArrayMapFunction : public LambdaFunction { while (args.current_row_idx < block->rows()) { Block lambda_block; for (int i = 0; i < names.size(); i++) { - ColumnWithTypeAndName data_column {data_types[i], names[i]}; + ColumnWithTypeAndName data_column; + if (_contains_column_id(args, i)) { + data_column = ColumnWithTypeAndName(data_types[i], names[i]); + } else { + data_column = ColumnWithTypeAndName(data_types[i]->create_column_const_with_default_value(0), data_types[i], names[i]); + } lambda_block.insert(std::move(data_column)); } @@ -316,7 +321,8 @@ class ArrayMapFunction : public LambdaFunction { auto src_column = block->get_by_position(i).column->convert_to_full_column_if_const(); columns[i]->insert_many_from(*src_column, args.current_row_idx, args.current_repeat_times); } else { - columns[i]->insert_many_defaults(args.current_repeat_times); + // must be column const + columns[i]->resize(columns[i]->size() + args.current_repeat_times); } } args.current_repeat_times = 0; From 77aed3d773b4580aa82417e0f54af35bec86cf9c Mon Sep 17 00:00:00 2001 From: garenshi Date: Thu, 5 Dec 2024 15:35:31 +0800 Subject: [PATCH 6/8] 1 --- be/src/vec/exprs/lambda_function/varray_map_function.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index f3d22040c0e97d..ea3ca270f22426 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -194,7 +194,7 @@ class ArrayMapFunction : public LambdaFunction { Block lambda_block; for (int i = 0; i < names.size(); i++) { ColumnWithTypeAndName data_column; - if (_contains_column_id(args, i)) { + if (_contains_column_id(args, i) || i >= gap) { data_column = ColumnWithTypeAndName(data_types[i], names[i]); } else { data_column = ColumnWithTypeAndName(data_types[i]->create_column_const_with_default_value(0), data_types[i], names[i]); From c740ebe2e28d73d9b449183be806186bbe027932 Mon Sep 17 00:00:00 2001 From: garenshi Date: Thu, 5 Dec 2024 16:15:13 +0800 Subject: [PATCH 7/8] 1 --- be/src/vec/exprs/lambda_function/varray_map_function.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index ea3ca270f22426..cb613c03e58e21 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -322,6 +322,7 @@ class ArrayMapFunction : public LambdaFunction { columns[i]->insert_many_from(*src_column, args.current_row_idx, args.current_repeat_times); } else { // must be column const + DCHECK(is_column_const(*columns[i])); columns[i]->resize(columns[i]->size() + args.current_repeat_times); } } From 6eeaa9f355e4a3db7073fe2394c38274a3b7d854 Mon Sep 17 00:00:00 2001 From: garenshi Date: Fri, 6 Dec 2024 15:37:09 +0800 Subject: [PATCH 8/8] 1 --- .../rowset/segment_v2/segment_iterator.cpp | 5 +- .../lambda_function/varray_map_function.cpp | 83 ++++++++++--------- be/src/vec/exprs/vcolumn_ref.h | 3 +- 3 files changed, 51 insertions(+), 40 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index d7a331f53f6bea..77241885b42c64 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2176,8 +2176,9 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { vectorized::MutableColumnPtr col0 = std::move(*block->get_by_position(0).column).mutate(); auto tmp_indicator_col = - block->get_by_position(0).type->create_column_const_with_default_value( - selected_size); + block->get_by_position(0) + .type->create_column_const_with_default_value( + selected_size); block->replace_by_position(0, std::move(tmp_indicator_col)); _output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size, block); diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index cb613c03e58e21..f7a864b92d3fb4 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include +#include +#include + #include #include #include -#include -#include -#include #include "common/status.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -85,7 +86,8 @@ class ArrayMapFunction : public LambdaFunction { int gap = 0; if (!args.output_slot_ref_indexs.empty()) { - auto max_id = std::max_element(args.output_slot_ref_indexs.begin(), args.output_slot_ref_indexs.end()); + auto max_id = std::max_element(args.output_slot_ref_indexs.begin(), + args.output_slot_ref_indexs.end()); gap = *max_id + 1; _set_column_ref_column_id(children[0], gap); } @@ -116,8 +118,8 @@ class ArrayMapFunction : public LambdaFunction { // used for save column array outside null map auto outside_null_map = ColumnUInt8::create(block->get_by_position(arguments[0]) - .column->convert_to_full_column_if_const() - ->size(), + .column->convert_to_full_column_if_const() + ->size(), 0); // offset column MutableColumnPtr array_column_offset; @@ -137,11 +139,11 @@ class ArrayMapFunction : public LambdaFunction { // get the array column from nullable column column_array = assert_cast(column_array.get()) - ->get_nested_column_ptr(); + ->get_nested_column_ptr(); // get the nested type from nullable type type_array = assert_cast(array_column_type_name.type.get()) - ->get_nested_type(); + ->get_nested_type(); // need to union nullmap from all columns VectorizedUtils::update_null_map(outside_null_map->get_data(), @@ -164,17 +166,17 @@ class ArrayMapFunction : public LambdaFunction { // c_array1: [0,1,2,3,4,5,6,7,8,9] const auto& array_offsets = assert_cast(*first_array_offsets) - .get_data(); + .get_data(); if (nested_array_column_rows != col_array.get_data_ptr()->size() || (!array_offsets.empty() && memcmp(array_offsets.data(), col_array.get_offsets().data(), sizeof(array_offsets[0]) * array_offsets.size()) != 0)) { return Status::InvalidArgument( - "in array map function, the input column size " - "are " - "not equal completely, nested column data rows 1st size is {}, {}th " - "size is {}.", - nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); + "in array map function, the input column size " + "are " + "not equal completely, nested column data rows 1st size is {}, {}th " + "size is {}.", + nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); } } lambda_datas[i] = col_array.get_data_ptr(); @@ -197,7 +199,9 @@ class ArrayMapFunction : public LambdaFunction { if (_contains_column_id(args, i) || i >= gap) { data_column = ColumnWithTypeAndName(data_types[i], names[i]); } else { - data_column = ColumnWithTypeAndName(data_types[i]->create_column_const_with_default_value(0), data_types[i], names[i]); + data_column = ColumnWithTypeAndName( + data_types[i]->create_column_const_with_default_value(0), data_types[i], + names[i]); } lambda_block.insert(std::move(data_column)); } @@ -205,7 +209,8 @@ class ArrayMapFunction : public LambdaFunction { MutableColumns columns = lambda_block.mutate_columns(); while (columns[gap]->size() < batch_size) { long max_step = batch_size - columns[gap]->size(); - long current_step = std::min(max_step, (long)(args.cur_size - args.current_offset_in_array)); + long current_step = + std::min(max_step, (long)(args.cur_size - args.current_offset_in_array)); size_t pos = args.array_start + args.current_offset_in_array; for (int i = 0; i < arguments.size(); ++i) { columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step); @@ -234,7 +239,7 @@ class ArrayMapFunction : public LambdaFunction { RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, result_column_id)); auto res_col = lambda_block.get_by_position(*result_column_id) - .column->convert_to_full_column_if_const(); + .column->convert_to_full_column_if_const(); res_type = lambda_block.get_by_position(*result_column_id).type; res_name = lambda_block.get_by_position(*result_column_id).name; if (!result_col) { @@ -249,32 +254,33 @@ class ArrayMapFunction : public LambdaFunction { ColumnWithTypeAndName result_arr; if (result_type->is_nullable()) { if (res_type->is_nullable()) { - result_arr = {ColumnNullable::create( - ColumnArray::create(result_col, std::move(array_column_offset)), - std::move(outside_null_map)), - result_type, res_name}; + result_arr = { + ColumnNullable::create( + ColumnArray::create(result_col, std::move(array_column_offset)), + std::move(outside_null_map)), + result_type, res_name}; } else { // deal with eg: select array_map(x -> x is null, [null, 1, 2]); // need to create the nested column null map for column array auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); result_arr = { - ColumnNullable::create( - ColumnArray::create( - ColumnNullable::create(result_col, std::move(nested_null_map)), - std::move(array_column_offset)), - std::move(outside_null_map)), - result_type, res_name}; + ColumnNullable::create( + ColumnArray::create(ColumnNullable::create( + result_col, std::move(nested_null_map)), + std::move(array_column_offset)), + std::move(outside_null_map)), + result_type, res_name}; } } else { if (res_type->is_nullable()) { result_arr = {ColumnArray::create(result_col, std::move(array_column_offset)), - result_type, res_name}; + result_type, res_name}; } else { auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); - result_arr = {ColumnArray::create( - ColumnNullable::create(result_col, std::move(nested_null_map)), - std::move(array_column_offset)), - result_type, res_name}; + result_arr = {ColumnArray::create(ColumnNullable::create( + result_col, std::move(nested_null_map)), + std::move(array_column_offset)), + result_type, res_name}; } } block->insert(std::move(result_arr)); @@ -284,9 +290,9 @@ class ArrayMapFunction : public LambdaFunction { } private: - bool _contains_column_id(LambdaArgs& args, int id) { - const auto it = std::find(args.output_slot_ref_indexs.begin(), args.output_slot_ref_indexs.end(), id); + const auto it = std::find(args.output_slot_ref_indexs.begin(), + args.output_slot_ref_indexs.end(), id); return it != args.output_slot_ref_indexs.end(); } @@ -312,14 +318,17 @@ class ArrayMapFunction : public LambdaFunction { } } - void _extend_data(std::vector& columns, Block* block, LambdaArgs& args,int size) { + void _extend_data(std::vector& columns, Block* block, LambdaArgs& args, + int size) { if (!args.current_repeat_times || !size) { return; } for (int i = 0; i < size; i++) { if (_contains_column_id(args, i)) { - auto src_column = block->get_by_position(i).column->convert_to_full_column_if_const(); - columns[i]->insert_many_from(*src_column, args.current_row_idx, args.current_repeat_times); + auto src_column = + block->get_by_position(i).column->convert_to_full_column_if_const(); + columns[i]->insert_many_from(*src_column, args.current_row_idx, + args.current_repeat_times); } else { // must be column const DCHECK(is_column_const(*columns[i])); diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h index 6904a9fc87c0db..d58e1375291457 100644 --- a/be/src/vec/exprs/vcolumn_ref.h +++ b/be/src/vec/exprs/vcolumn_ref.h @@ -16,11 +16,12 @@ // under the License. #pragma once +#include + #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/exprs/vexpr.h" #include "vec/functions/function.h" -#include namespace doris { namespace vectorized {