diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 51b3aed079b3b9..4afa9fb8538033 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -398,6 +398,7 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block( .get_data() .resize_fill(origin_sz + max_added_rows, 0); } else { + // TODO: for cross join, maybe could insert one row, and wrap for a const column dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, max_added_rows); } } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 223420ea55aff1..febc9634c49f23 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -224,6 +224,8 @@ class PipelineTask { RuntimeState* runtime_state() const { return _state; } + RuntimeProfile* get_task_profile() const { return _task_profile.get(); } + std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } void stop_if_finished() { diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 39af2ad1c25c13..d717c18ccec64a 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -26,6 +26,7 @@ #include #include "runtime/types.h" +#include "util/runtime_profile.h" #include "vec/common/arena.h" namespace doris { @@ -88,6 +89,12 @@ class FunctionContext { _jsonb_string_as_string = jsonb_string_as_string; } + void set_udf_execute_timer(RuntimeProfile::Counter* udf_execute_timer) { + _udf_execute_timer = udf_execute_timer; + } + + RuntimeProfile::Counter* get_udf_execute_timer() { return _udf_execute_timer; } + // Cast flag, when enable string_as_jsonb_string, string casting to jsonb will not parse string // instead just insert a string literal bool string_as_jsonb_string() const { return _string_as_jsonb_string; } @@ -176,6 +183,8 @@ class FunctionContext { std::vector> _constant_cols; + //udf execute timer + RuntimeProfile::Counter* _udf_execute_timer = nullptr; bool _check_overflow_for_decimal = false; bool _string_as_jsonb_string = false; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index a39ac18598bb8a..4be4649fcadadc 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -845,6 +845,12 @@ void ColumnArray::insert_indices_from(const IColumn& src, const uint32_t* indice } } +void ColumnArray::insert_many_from(const IColumn& src, size_t position, size_t length) { + for (auto x = 0; x != length; ++x) { + ColumnArray::insert_from(src, position); + } +} + ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) const { if (replicate_offsets.empty()) return clone_empty(); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 39c11fcc7eb6ba..114dbfd02fb0dd 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -165,6 +165,7 @@ class ColumnArray final : public COWHelper { size_t byte_size() const override; size_t allocated_bytes() const override; ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; + void insert_many_from(const IColumn& src, size_t position, size_t length) override; ColumnPtr convert_to_full_column_if_const() const override; diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 31aa0bc12ebab7..c8c1b78d580166 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -175,6 +175,14 @@ class ColumnComplexType final : public COWHelper> } } + void insert_many_from(const IColumn& src, size_t position, size_t length) override { + const Self& src_vec = assert_cast(src); + auto val = src_vec.get_element(position); + for (uint32_t i = 0; i < length; ++i) { + data.emplace_back(val); + } + } + void pop_back(size_t n) override { data.erase(data.end() - n, data.end()); } // it's impossible to use ComplexType as key , so we don't have to implement them [[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena, diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index b88a2a980f8795..46b93619172396 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -147,6 +147,10 @@ class ColumnConst final : public COWHelper { s += length; } + void insert_many_from(const IColumn& src, size_t position, size_t length) override { + s += length; + } + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override { s += (indices_end - indices_begin); diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 2e5fc5e136a508..cf0193b29e187e 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -300,6 +300,14 @@ void ColumnDecimal::insert_many_fix_len_data(const char* data_ptr, size_t num } } +template +void ColumnDecimal::insert_many_from(const IColumn& src, size_t position, size_t length) { + auto old_size = data.size(); + data.resize(old_size + length); + auto& vals = assert_cast(src).get_data(); + std::fill(&data[old_size], &data[old_size + length], vals[position]); +} + template void ColumnDecimal::insert_range_from(const IColumn& src, size_t start, size_t length) { const ColumnDecimal& src_vec = assert_cast(src); diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 0927cb88e15abc..bdc8e932ad1d42 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -158,6 +158,8 @@ class ColumnDecimal final : public COWHelper> { memset(data.data() + old_size, 0, length * sizeof(data[0])); } + void insert_many_from(const IColumn& src, size_t position, size_t length) override; + void pop_back(size_t n) override { data.resize_assume_reserved(data.size() - n); } StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index c00d1faba4117e..85964ca967b095 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -197,6 +197,12 @@ void ColumnMap::insert_indices_from(const IColumn& src, const uint32_t* indices_ } } +void ColumnMap::insert_many_from(const IColumn& src, size_t position, size_t length) { + for (auto x = 0; x != length; ++x) { + ColumnMap::insert_from(src, position); + } +} + StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const { size_t array_size = size_at(n); size_t offset = offset_at(n); diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 6a3baf6a9dbfab..39746604c24e25 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -131,6 +131,8 @@ class ColumnMap final : public COWHelper { void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override; + void insert_many_from(const IColumn& src, size_t position, size_t length) override; + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector) const override { return append_data_by_selector_impl(res, selector); diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index dbee5a2025aa70..54f4bd3c118d88 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -206,6 +206,12 @@ void ColumnNullable::insert_many_strings(const StringRef* strings, size_t num) { } } +void ColumnNullable::insert_many_from(const IColumn& src, size_t position, size_t length) { + const auto& nullable_col = assert_cast(src); + get_null_map_column().insert_many_from(nullable_col.get_null_map_column(), position, length); + get_nested_column().insert_many_from(*nullable_col.nested_column, position, length); +} + StringRef ColumnNullable::serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const { const auto& arr = get_null_map_data(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 7772b6e80ade9b..cdf7271a6c36a5 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -198,6 +198,8 @@ class ColumnNullable final : public COWHelper, public N void insert(const Field& x) override; void insert_from(const IColumn& src, size_t n) override; + void insert_many_from(const IColumn& src, size_t position, size_t length) override; + template void insert_from_with_type(const IColumn& src, size_t n) { const auto& src_concrete = assert_cast(src); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 2cfd98b9657855..6eb3e45b2e015a 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -180,6 +180,28 @@ void ColumnStr::insert_range_from(const IColumn& src, size_t start, size_t le } } +template +void ColumnStr::insert_many_from(const IColumn& src, size_t position, size_t length) { + const auto& string_column = assert_cast&>(src); + auto [data_val, data_length] = string_column.get_data_at(position); + + size_t old_chars_size = chars.size(); + check_chars_length(old_chars_size + data_length * length, offsets.size() + length); + chars.resize(old_chars_size + data_length * length); + + auto old_size = offsets.size(); + offsets.resize(old_size + length); + + auto start_pos = old_size; + auto end_pos = old_size + length; + auto prev_pos = old_chars_size; + for (; start_pos < end_pos; ++start_pos) { + memcpy(&chars[prev_pos], data_val, data_length); + offsets[start_pos] = prev_pos + data_length; + prev_pos = prev_pos + data_length; + } +} + template void ColumnStr::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) { diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 8a073ef08cbc11..906f62b52aaca2 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -161,6 +161,8 @@ class ColumnStr final : public COWHelper> { offsets.push_back(new_size); } + void insert_many_from(const IColumn& src, size_t position, size_t length) override; + bool is_column_string64() const override { return sizeof(T) == sizeof(uint64_t); } void insert_from(const IColumn& src_, size_t n) override { diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index c08d12560c9863..ec0c5e6a6877fc 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -243,6 +243,13 @@ void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indic } } +void ColumnStruct::insert_many_from(const IColumn& src, size_t position, size_t length) { + const auto& src_concrete = assert_cast(src); + for (size_t i = 0; i < columns.size(); ++i) { + columns[i]->insert_many_from(src_concrete.get_column(i), position, length); + } +} + void ColumnStruct::insert_range_from(const IColumn& src, size_t start, size_t length) { const size_t tuple_size = columns.size(); for (size_t i = 0; i < tuple_size; ++i) { diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index 42581c63be89bc..8a86330b1db966 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -135,6 +135,8 @@ class ColumnStruct final : public COWHelper { void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override; + void insert_many_from(const IColumn& src, size_t position, size_t length) override; + void append_data_by_selector(MutableColumnPtr& res, const Selector& selector) const override { return append_data_by_selector_impl(res, selector); } diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 300152b57843d3..3d3aa89243f866 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -431,6 +431,14 @@ size_t ColumnVector::filter(const IColumn::Filter& filter) { return new_size; } +template +void ColumnVector::insert_many_from(const IColumn& src, size_t position, size_t length) { + auto old_size = data.size(); + data.resize(old_size + length); + auto& vals = assert_cast(src).get_data(); + std::fill(&data[old_size], &data[old_size + length], vals[position]); +} + template ColumnPtr ColumnVector::permute(const IColumn::Permutation& perm, size_t limit) const { size_t size = data.size(); diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 015595797b932f..df99e519f2aacb 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -173,6 +173,8 @@ class ColumnVector final : public COWHelper> { std::fill(data.data() + old_size, data.data() + old_size + n, val); } + void insert_many_from(const IColumn& src, size_t position, size_t length) override; + void insert_range_of_integer(T begin, T end) { if constexpr (std::is_integral_v) { auto old_size = data.size(); diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp index f5fe920d931a7a..222a8f5629af77 100644 --- a/be/src/vec/exprs/vcase_expr.cpp +++ b/be/src/vec/exprs/vcase_expr.cpp @@ -86,7 +86,7 @@ Status VCaseExpr::open(RuntimeState* state, VExprContext* context, for (auto& i : _children) { RETURN_IF_ERROR(i->open(state, context, scope)); } - RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index 7b69690385bb92..38f861add87224 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -89,7 +89,7 @@ doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context, for (auto& i : _children) { RETURN_IF_ERROR(i->open(state, context, scope)); } - RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index 48582d83cbac76..3192653a816f2c 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -27,6 +27,7 @@ #include "common/config.h" #include "common/consts.h" #include "common/status.h" +#include "pipeline/pipeline_task.h" #include "runtime/runtime_state.h" #include "udf/udf.h" #include "vec/columns/column.h" @@ -125,7 +126,7 @@ Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context, for (auto& i : _children) { RETURN_IF_ERROR(i->open(state, context, scope)); } - RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 13f67bc2eaf9aa..7e77d92c3106b6 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -31,6 +31,7 @@ #include "common/config.h" #include "common/exception.h" #include "common/status.h" +#include "pipeline/pipeline_task.h" #include "runtime/define_primitive_type.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" @@ -562,7 +563,7 @@ void VExpr::register_function_context(RuntimeState* state, VExprContext* context _fn_context_index = context->register_function_context(state, _type, arg_types); } -Status VExpr::init_function_context(VExprContext* context, +Status VExpr::init_function_context(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope, const FunctionBasePtr& function) const { FunctionContext* fn_ctx = context->fn_context(_fn_context_index); @@ -574,6 +575,12 @@ Status VExpr::init_function_context(VExprContext* context, constant_cols.push_back(const_col); } fn_ctx->set_constant_cols(constant_cols); + } else { + if (function->is_udf_function()) { + auto* timer = ADD_TIMER(state->get_task()->get_task_profile(), + "UDF[" + function->get_name() + "]"); + fn_ctx->set_udf_execute_timer(timer); + } } if (scope == FunctionContext::FRAGMENT_LOCAL) { diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 558b4f57eabe0a..142ab6a27d2b5e 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -286,7 +286,8 @@ class VExpr { /// 1. Set constant columns result of function arguments. /// 2. Call function's prepare() to initialize function state, fragment-local or /// thread-local according the input `FunctionStateScope` argument. - Status init_function_context(VExprContext* context, FunctionContext::FunctionStateScope scope, + Status init_function_context(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope, const FunctionBasePtr& function) const; /// Helper function to close function context, fragment-local or thread-local according diff --git a/be/src/vec/exprs/vin_predicate.cpp b/be/src/vec/exprs/vin_predicate.cpp index 2b95cc921233bd..179a30971eabb8 100644 --- a/be/src/vec/exprs/vin_predicate.cpp +++ b/be/src/vec/exprs/vin_predicate.cpp @@ -91,7 +91,7 @@ Status VInPredicate::open(RuntimeState* state, VExprContext* context, for (auto& child : _children) { RETURN_IF_ERROR(child->open(state, context, scope)); } - RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } diff --git a/be/src/vec/exprs/vmatch_predicate.cpp b/be/src/vec/exprs/vmatch_predicate.cpp index 813e7ba24ea919..c80933df13c0bd 100644 --- a/be/src/vec/exprs/vmatch_predicate.cpp +++ b/be/src/vec/exprs/vmatch_predicate.cpp @@ -109,7 +109,7 @@ Status VMatchPredicate::open(RuntimeState* state, VExprContext* context, for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->open(state, context, scope)); } - RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function)); if (scope == FunctionContext::THREAD_LOCAL || scope == FunctionContext::FRAGMENT_LOCAL) { context->fn_context(_fn_context_index)->set_function_state(scope, _inverted_index_ctx); } diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h index f618cc9a1b0ffb..4702a4b7af0bbf 100644 --- a/be/src/vec/functions/function.h +++ b/be/src/vec/functions/function.h @@ -210,6 +210,8 @@ class IFunctionBase { virtual bool is_use_default_implementation_for_constants() const = 0; + virtual bool is_udf_function() const { return false; } + /// The property of monotonicity for a certain range. struct Monotonicity { bool is_monotonic = false; /// Is the function monotonous (nondecreasing or nonincreasing). diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index 63985f1964c42e..e2c441b660201d 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -44,6 +44,7 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) { + SCOPED_TIMER(context->get_udf_execute_timer()); std::shared_ptr jni_ctx = std::make_shared(); context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx); @@ -96,7 +97,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block, RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); JniContext* jni_ctx = reinterpret_cast( context->get_function_state(FunctionContext::THREAD_LOCAL)); - + SCOPED_TIMER(context->get_udf_execute_timer()); std::unique_ptr input_table; RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table)); auto input_table_schema = JniConnector::parse_table_schema(&block, arguments, true); diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h index e507392184f85d..e35fc67881acb5 100644 --- a/be/src/vec/functions/function_java_udf.h +++ b/be/src/vec/functions/function_java_udf.h @@ -107,6 +107,8 @@ class JavaFunctionCall : public IFunctionBase { bool is_use_default_implementation_for_constants() const override { return true; } + bool is_udf_function() const override { return true; } + private: const TFunction& fn_; const DataTypes _argument_types;