diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 2c7aa5a0ce1cd0..275cedaeb1fbb7 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2167,11 +2167,10 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { if (block->rows() == 0) { vectorized::MutableColumnPtr col0 = std::move(*block->get_by_position(0).column).mutate(); - auto res_column = vectorized::ColumnString::create(); - res_column->insert_data("", 0); - auto col_const = vectorized::ColumnConst::create(std::move(res_column), - selected_size); - block->replace_by_position(0, std::move(col_const)); + auto tmp_indicator_col = + block->get_by_position(0).type->create_column_const_with_default_value( + selected_size); + block->replace_by_position(0, std::move(tmp_indicator_col)); _output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size, block); block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0); diff --git a/be/src/vec/exprs/lambda_function/lambda_function.h b/be/src/vec/exprs/lambda_function/lambda_function.h index 184b4c2cc38abf..07302954688271 100644 --- a/be/src/vec/exprs/lambda_function/lambda_function.h +++ b/be/src/vec/exprs/lambda_function/lambda_function.h @@ -17,7 +17,7 @@ #pragma once -#include +#include #include "common/status.h" #include "vec/core/block.h" @@ -31,9 +31,16 @@ class LambdaFunction { virtual std::string get_name() const = 0; + virtual doris::Status prepare(RuntimeState* state) { + batch_size = state->batch_size(); + return Status::OK(); + } + virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id, const DataTypePtr& result_type, const VExprSPtrs& children) = 0; + + int batch_size; }; using LambdaFunctionPtr = std::shared_ptr; diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index f80cffa166eac4..f28485c7df3981 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -17,7 +17,11 @@ #include #include -#include +#include +#include +#include +#include +#include #include "common/status.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -42,6 +46,17 @@ namespace doris::vectorized { class VExprContext; +struct LambdaArgs { + std::vector output_slot_ref_indexs; + int64_t current_row_idx = 0; + int64_t current_offset_in_array = 0; + size_t array_start = 0; + int64_t cur_size = 0; + const ColumnArray::Offsets64* offsets_ptr = nullptr; + int current_repeat_times = 0; + bool eos = false; +}; + class ArrayMapFunction : public LambdaFunction { ENABLE_FACTORY_CREATOR(ArrayMapFunction); @@ -57,8 +72,32 @@ class ArrayMapFunction : public LambdaFunction { doris::Status execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id, const DataTypePtr& result_type, const VExprSPtrs& children) override { - ///* array_map(lambda,arg1,arg2,.....) */// + LambdaArgs args; + // collect used slot ref in lambda function body + _collect_slot_ref_column_id(children[0], args); + + int gap = 0; + if (!args.output_slot_ref_indexs.empty()) { + auto max_id = std::max_element(args.output_slot_ref_indexs.begin(), args.output_slot_ref_indexs.end()); + gap = *max_id + 1; + _set_column_ref_column_id(children[0], gap); + } + + std::vector names(gap); + DataTypes data_types(gap); + for (int i = 0; i < gap; ++i) { + if (_contains_column_id(args, i)) { + names[i] = block->get_by_position(i).name; + data_types[i] = block->get_by_position(i).type; + } else { + // padding some mock data + names[i] = "temp"; + data_types[i] = std::make_shared(); + } + } + + ///* array_map(lambda,arg1,arg2,.....) */// //1. child[1:end]->execute(src_block) doris::vectorized::ColumnNumbers arguments(children.size() - 1); for (int i = 1; i < children.size(); ++i) { @@ -70,21 +109,20 @@ class ArrayMapFunction : public LambdaFunction { // used for save column array outside null map auto outside_null_map = ColumnUInt8::create(block->get_by_position(arguments[0]) - .column->convert_to_full_column_if_const() - ->size(), + .column->convert_to_full_column_if_const() + ->size(), 0); // offset column MutableColumnPtr array_column_offset; size_t nested_array_column_rows = 0; ColumnPtr first_array_offsets = nullptr; - //2. get the result column from executed expr, and the needed is nested column of array - Block lambda_block; + std::vector lambda_datas(arguments.size()); + for (int i = 0; i < arguments.size(); ++i) { const auto& array_column_type_name = block->get_by_position(arguments[i]); auto column_array = array_column_type_name.column->convert_to_full_column_if_const(); auto type_array = array_column_type_name.type; - if (type_array->is_nullable()) { // get the nullmap of nullable column const auto& column_array_nullmap = @@ -92,11 +130,11 @@ class ArrayMapFunction : public LambdaFunction { // get the array column from nullable column column_array = assert_cast(column_array.get()) - ->get_nested_column_ptr(); + ->get_nested_column_ptr(); // get the nested type from nullable type type_array = assert_cast(array_column_type_name.type.get()) - ->get_nested_type(); + ->get_nested_type(); // need to union nullmap from all columns VectorizedUtils::update_null_map(outside_null_map->get_data(), @@ -113,75 +151,172 @@ class ArrayMapFunction : public LambdaFunction { const auto& off_data = assert_cast( col_array.get_offsets_column()); array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size()); + args.offsets_ptr = &col_array.get_offsets(); } else { // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2; // c_array1: [0,1,2,3,4,5,6,7,8,9] const auto& array_offsets = assert_cast(*first_array_offsets) - .get_data(); + .get_data(); if (nested_array_column_rows != col_array.get_data_ptr()->size() || (!array_offsets.empty() && memcmp(array_offsets.data(), col_array.get_offsets().data(), sizeof(array_offsets[0]) * array_offsets.size()) != 0)) { return Status::InvalidArgument( - "in array map function, the input column size " - "are " - "not equal completely, nested column data rows 1st size is {}, {}th " - "size is {}.", - nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); + "in array map function, the input column size " + "are " + "not equal completely, nested column data rows 1st size is {}, {}th " + "size is {}.", + nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); } } - - // insert the data column to the new block - ColumnWithTypeAndName data_column {col_array.get_data_ptr(), col_type.get_nested_type(), - "R" + array_column_type_name.name}; - lambda_block.insert(std::move(data_column)); + lambda_datas[i] = col_array.get_data_ptr(); + names.push_back("R" + array_column_type_name.name); + data_types.push_back(col_type.get_nested_type()); } - //3. child[0]->execute(new_block) - RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, result_column_id)); + ColumnPtr result_col = nullptr; + DataTypePtr res_type; + std::string res_name; + + //process first row + args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1]; + args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - args.array_start; + + while (args.current_row_idx < block->rows()) { + Block lambda_block; + for (int i = 0; i < names.size(); i++) { + ColumnWithTypeAndName data_column {data_types[i], names[i]}; + lambda_block.insert(std::move(data_column)); + } + + MutableBlock m_lambda_block(&lambda_block); + MutableColumns& columns = m_lambda_block.mutable_columns(); + while (columns[gap]->size() < batch_size) { + long max_step = batch_size - columns[gap]->size(); + long current_step = std::min(max_step, (long)(args.cur_size - args.current_offset_in_array)); + size_t pos = args.array_start + args.current_offset_in_array; + for (int i = 0; i < arguments.size(); ++i) { + columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step); + } + args.current_offset_in_array += current_step; + args.current_repeat_times += current_step; + if (args.current_offset_in_array >= args.cur_size) { + args.eos = true; + } else { + _extend_data(columns, block, args, gap); + } + if (args.eos) { + _extend_data(columns, block, args, gap); + args.current_row_idx++; + args.current_offset_in_array = 0; + if (args.current_row_idx >= block->rows()) { + break; + } + args.eos = false; + args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1]; + args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - args.array_start; + } + } + + lambda_block.set_columns(std::move(columns)); - auto res_col = lambda_block.get_by_position(*result_column_id) - .column->convert_to_full_column_if_const(); - auto res_type = lambda_block.get_by_position(*result_column_id).type; - auto res_name = lambda_block.get_by_position(*result_column_id).name; + //3. child[0]->execute(new_block) + RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, result_column_id)); + + auto res_col = lambda_block.get_by_position(*result_column_id) + .column->convert_to_full_column_if_const(); + res_type = lambda_block.get_by_position(*result_column_id).type; + res_name = lambda_block.get_by_position(*result_column_id).name; + if (!result_col) { + result_col = std::move(res_col); + } else { + MutableColumnPtr column = (*std::move(result_col)).mutate(); + column->insert_range_from(*res_col, 0, res_col->size()); + } + } //4. get the result column after execution, reassemble it into a new array column, and return. ColumnWithTypeAndName result_arr; if (result_type->is_nullable()) { if (res_type->is_nullable()) { result_arr = {ColumnNullable::create( - ColumnArray::create(res_col, std::move(array_column_offset)), - std::move(outside_null_map)), - result_type, res_name}; + ColumnArray::create(result_col, std::move(array_column_offset)), + std::move(outside_null_map)), + result_type, res_name}; } else { // deal with eg: select array_map(x -> x is null, [null, 1, 2]); // need to create the nested column null map for column array - auto nested_null_map = ColumnUInt8::create(res_col->size(), 0); + auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); result_arr = { - ColumnNullable::create( - ColumnArray::create( - ColumnNullable::create(res_col, std::move(nested_null_map)), - std::move(array_column_offset)), - std::move(outside_null_map)), - result_type, res_name}; + ColumnNullable::create( + ColumnArray::create( + ColumnNullable::create(result_col, std::move(nested_null_map)), + std::move(array_column_offset)), + std::move(outside_null_map)), + result_type, res_name}; } } else { if (res_type->is_nullable()) { - result_arr = {ColumnArray::create(res_col, std::move(array_column_offset)), - result_type, res_name}; + result_arr = {ColumnArray::create(result_col, std::move(array_column_offset)), + result_type, res_name}; } else { - auto nested_null_map = ColumnUInt8::create(res_col->size(), 0); + auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); result_arr = {ColumnArray::create( - ColumnNullable::create(res_col, std::move(nested_null_map)), - std::move(array_column_offset)), - result_type, res_name}; + ColumnNullable::create(result_col, std::move(nested_null_map)), + std::move(array_column_offset)), + result_type, res_name}; } } block->insert(std::move(result_arr)); *result_column_id = block->columns() - 1; + return Status::OK(); } + +private: + + bool _contains_column_id(LambdaArgs& args, int id) { + const auto it = std::find(args.output_slot_ref_indexs.begin(), args.output_slot_ref_indexs.end(), id); + return it != args.output_slot_ref_indexs.end(); + } + + void _set_column_ref_column_id(VExprSPtr expr, int gap) { + for (const auto& child : expr->children()) { + if (child->is_column_ref()) { + auto* ref = static_cast(child.get()); + ref->set_gap(gap); + } else { + _set_column_ref_column_id(child, gap); + } + } + } + + void _collect_slot_ref_column_id(VExprSPtr expr, LambdaArgs& args) { + for (const auto& child : expr->children()) { + if (child->is_slot_ref()) { + const auto* ref = static_cast(child.get()); + args.output_slot_ref_indexs.push_back(ref->column_id()); + } else { + _collect_slot_ref_column_id(child, args); + } + } + } + + void _extend_data(std::vector& columns, Block* block, LambdaArgs& args,int size) { + if (!args.current_repeat_times || !size) { + return; + } + for (int i = 0; i < size; i++) { + if (_contains_column_id(args, i)) { + auto src_column = block->get_by_position(i).column->convert_to_full_column_if_const(); + columns[i]->insert_many_from(*src_column, args.current_row_idx, args.current_repeat_times); + } else { + columns[i]->insert_many_defaults(args.current_repeat_times); + } + } + args.current_repeat_times = 0; + } }; void register_function_array_map(doris::vectorized::LambdaFunctionFactory& factory) { diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h index a763797880e361..6904a9fc87c0db 100644 --- a/be/src/vec/exprs/vcolumn_ref.h +++ b/be/src/vec/exprs/vcolumn_ref.h @@ -20,6 +20,7 @@ #include "runtime/runtime_state.h" #include "vec/exprs/vexpr.h" #include "vec/functions/function.h" +#include namespace doris { namespace vectorized { @@ -57,7 +58,7 @@ class VColumnRef final : public VExpr { Status execute(VExprContext* context, Block* block, int* result_column_id) override { DCHECK(_open_finished || _getting_const_col); - *result_column_id = _column_id; + *result_column_id = _column_id + _gap; return Status::OK(); } @@ -67,6 +68,12 @@ class VColumnRef final : public VExpr { const std::string& expr_name() const override { return _column_name; } + void set_gap(int gap) { + if (_gap == 0) { + _gap = gap; + } + } + std::string debug_string() const override { std::stringstream out; out << "VColumnRef(slot_id: " << _column_id << ",column_name: " << _column_name @@ -76,6 +83,7 @@ class VColumnRef final : public VExpr { private: int _column_id; + std::atomic _gap = 0; std::string _column_name; }; } // namespace vectorized diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 3456fb431a48cb..2580cf7ddc88f9 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -148,6 +148,9 @@ class VExpr { TypeDescriptor type() { return _type; } bool is_slot_ref() const { return _node_type == TExprNodeType::SLOT_REF; } + + bool is_column_ref() const { return _node_type == TExprNodeType::COLUMN_REF; } + virtual bool is_literal() const { return false; } TExprNodeType::type node_type() const { return _node_type; } diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h b/be/src/vec/exprs/vlambda_function_call_expr.h index 44d22b1f9ebbea..7b0ea1dcb5a75d 100644 --- a/be/src/vec/exprs/vlambda_function_call_expr.h +++ b/be/src/vec/exprs/vlambda_function_call_expr.h @@ -50,6 +50,7 @@ class VLambdaFunctionCallExpr : public VExpr { return Status::InternalError("Lambda Function {} is not implemented.", _fn.name.function_name); } + RETURN_IF_ERROR(_lambda_function->prepare(state)); _prepare_finished = true; return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java index dbcbea7c104b5a..7c77c680ff287b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/Scope.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.analyzer; import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.expressions.SubqueryExpr; import org.apache.doris.nereids.util.Utils; import com.google.common.base.Suppliers; @@ -62,20 +61,18 @@ public class Scope { private final Optional outerScope; private final List slots; - private final Optional ownerSubquery; private final Set correlatedSlots; private final boolean buildNameToSlot; private final Supplier> nameToSlot; public Scope(List slots) { - this(Optional.empty(), slots, Optional.empty()); + this(Optional.empty(), slots); } /** Scope */ - public Scope(Optional outerScope, List slots, Optional subqueryExpr) { + public Scope(Optional outerScope, List slots) { this.outerScope = Objects.requireNonNull(outerScope, "outerScope can not be null"); this.slots = Utils.fastToImmutableList(Objects.requireNonNull(slots, "slots can not be null")); - this.ownerSubquery = Objects.requireNonNull(subqueryExpr, "subqueryExpr can not be null"); this.correlatedSlots = Sets.newLinkedHashSet(); this.buildNameToSlot = slots.size() > 500; this.nameToSlot = buildNameToSlot ? Suppliers.memoize(this::buildNameToSlot) : null; @@ -89,10 +86,6 @@ public Optional getOuterScope() { return outerScope; } - public Optional getSubquery() { - return ownerSubquery; - } - public Set getCorrelatedSlots() { return correlatedSlots; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java index cde659b91021ad..1e481542baec12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java @@ -1229,7 +1229,7 @@ private E checkBoundExceptLambda(E expression, Plan plan) private Scope toScope(CascadesContext cascadesContext, List slots) { Optional outerScope = cascadesContext.getOuterScope(); if (outerScope.isPresent()) { - return new Scope(outerScope, slots, outerScope.get().getSubquery()); + return new Scope(outerScope, slots); } else { return new Scope(slots); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java index 49789aa66e1ff8..84df12b66ee37d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java @@ -781,8 +781,9 @@ private UnboundFunction bindHighOrderFunction(UnboundFunction unboundFunction, E .map(ArrayItemReference::toSlot) .collect(ImmutableList.toImmutableList()); - ExpressionAnalyzer lambdaAnalyzer = new ExpressionAnalyzer(currentPlan, new Scope(boundedSlots), - context == null ? null : context.cascadesContext, true, false) { + ExpressionAnalyzer lambdaAnalyzer = new ExpressionAnalyzer(currentPlan, new Scope(Optional.of(getScope()), + boundedSlots), context == null ? null : context.cascadesContext, + true, true) { @Override protected void couldNotFoundColumn(UnboundSlot unboundSlot, String tableName) { throw new AnalysisException("Unknown lambda slot '" diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java index 7b0ed45708251d..9a70ce24afb8ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java @@ -53,7 +53,6 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; /** @@ -205,19 +204,15 @@ private AnalyzedResult analyzeSubquery(SubqueryExpr expr) { } CascadesContext subqueryContext = CascadesContext.newContextWithCteContext( cascadesContext, expr.getQueryPlan(), cascadesContext.getCteContext()); - Scope subqueryScope = genScopeWithSubquery(expr); + // don't use `getScope()` because we only need `getScope().getOuterScope()` and `getScope().getSlots()` + // otherwise unexpected errors may occur + Scope subqueryScope = new Scope(getScope().getOuterScope(), getScope().getSlots()); subqueryContext.setOuterScope(subqueryScope); subqueryContext.newAnalyzer().analyze(); return new AnalyzedResult((LogicalPlan) subqueryContext.getRewritePlan(), subqueryScope.getCorrelatedSlots()); } - private Scope genScopeWithSubquery(SubqueryExpr expr) { - return new Scope(getScope().getOuterScope(), - getScope().getSlots(), - Optional.ofNullable(expr)); - } - public Scope getScope() { return scope; } diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy index 311b188e433550..1fd0c8ec147201 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy @@ -90,8 +90,7 @@ suite("test_array_map_function") { test { sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from ${tableName} where id > 10 order by id""" check{result, exception, startTime, endTime -> - assertTrue(exception != null) - logger.info(exception.message) + assertTrue(exception == null) } } diff --git a/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy b/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy index 2959c7f136b8e5..e382fe9ae70b84 100644 --- a/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy +++ b/regression-test/suites/query_p0/sql_functions/datetime_functions/test_convert_tz.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_convert_tz") { + sql "drop table if exists `cvt_tz`" sql """ CREATE TABLE `cvt_tz` ( `rowid` int NULL, @@ -34,4 +35,4 @@ suite("test_convert_tz") { order_qt_sql1 """ select convert_tz(dt, '+00:00', IF(property_value IS NULL, '+00:00', property_value)) from cvt_tz """ -} \ No newline at end of file +}