Skip to content

Commit

Permalink
[cherry-pick](branch-30) pick prs (#41779) (#41623) (#44406)
Browse files Browse the repository at this point in the history
cherry-pick from master (#41779)  (#41623)
  • Loading branch information
zhangstar333 authored Nov 27, 2024
1 parent 03b1971 commit 26beabb
Show file tree
Hide file tree
Showing 29 changed files with 122 additions and 8 deletions.
1 change: 1 addition & 0 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block(
.get_data()
.resize_fill(origin_sz + max_added_rows, 0);
} else {
// TODO: for cross join, maybe could insert one row, and wrap for a const column
dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, max_added_rows);
}
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class PipelineTask {

RuntimeState* runtime_state() const { return _state; }

RuntimeProfile* get_task_profile() const { return _task_profile.get(); }

std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }

void stop_if_finished() {
Expand Down
9 changes: 9 additions & 0 deletions be/src/udf/udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/common/arena.h"

namespace doris {
Expand Down Expand Up @@ -88,6 +89,12 @@ class FunctionContext {
_jsonb_string_as_string = jsonb_string_as_string;
}

void set_udf_execute_timer(RuntimeProfile::Counter* udf_execute_timer) {
_udf_execute_timer = udf_execute_timer;
}

RuntimeProfile::Counter* get_udf_execute_timer() { return _udf_execute_timer; }

// Cast flag, when enable string_as_jsonb_string, string casting to jsonb will not parse string
// instead just insert a string literal
bool string_as_jsonb_string() const { return _string_as_jsonb_string; }
Expand Down Expand Up @@ -176,6 +183,8 @@ class FunctionContext {

std::vector<std::shared_ptr<doris::ColumnPtrWrapper>> _constant_cols;

//udf execute timer
RuntimeProfile::Counter* _udf_execute_timer = nullptr;
bool _check_overflow_for_decimal = false;

bool _string_as_jsonb_string = false;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,12 @@ void ColumnArray::insert_indices_from(const IColumn& src, const uint32_t* indice
}
}

void ColumnArray::insert_many_from(const IColumn& src, size_t position, size_t length) {
for (auto x = 0; x != length; ++x) {
ColumnArray::insert_from(src, position);
}
}

ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) const {
if (replicate_offsets.empty()) return clone_empty();

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
size_t byte_size() const override;
size_t allocated_bytes() const override;
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override;
void insert_many_from(const IColumn& src, size_t position, size_t length) override;

ColumnPtr convert_to_full_column_if_const() const override;

Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/columns/column_complex.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ class ColumnComplexType final : public COWHelper<IColumn, ColumnComplexType<T>>
}
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override {
const Self& src_vec = assert_cast<const Self&>(src);
auto val = src_vec.get_element(position);
for (uint32_t i = 0; i < length; ++i) {
data.emplace_back(val);
}
}

void pop_back(size_t n) override { data.erase(data.end() - n, data.end()); }
// it's impossible to use ComplexType as key , so we don't have to implement them
[[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
s += length;
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override {
s += length;
}

void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) override {
s += (indices_end - indices_begin);
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/columns/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ void ColumnDecimal<T>::insert_many_fix_len_data(const char* data_ptr, size_t num
}
}

template <typename T>
void ColumnDecimal<T>::insert_many_from(const IColumn& src, size_t position, size_t length) {
auto old_size = data.size();
data.resize(old_size + length);
auto& vals = assert_cast<const Self&>(src).get_data();
std::fill(&data[old_size], &data[old_size + length], vals[position]);
}

template <typename T>
void ColumnDecimal<T>::insert_range_from(const IColumn& src, size_t start, size_t length) {
const ColumnDecimal& src_vec = assert_cast<const ColumnDecimal&>(src);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class ColumnDecimal final : public COWHelper<IColumn, ColumnDecimal<T>> {
memset(data.data() + old_size, 0, length * sizeof(data[0]));
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void pop_back(size_t n) override { data.resize_assume_reserved(data.size() - n); }

StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ void ColumnMap::insert_indices_from(const IColumn& src, const uint32_t* indices_
}
}

void ColumnMap::insert_many_from(const IColumn& src, size_t position, size_t length) {
for (auto x = 0; x != length; ++x) {
ColumnMap::insert_from(src, position);
}
}

StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const {
size_t array_size = size_at(n);
size_t offset = offset_at(n);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) override;

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void append_data_by_selector(MutableColumnPtr& res,
const IColumn::Selector& selector) const override {
return append_data_by_selector_impl<ColumnMap>(res, selector);
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ void ColumnNullable::insert_many_strings(const StringRef* strings, size_t num) {
}
}

void ColumnNullable::insert_many_from(const IColumn& src, size_t position, size_t length) {
const auto& nullable_col = assert_cast<const ColumnNullable&>(src);
get_null_map_column().insert_many_from(nullable_col.get_null_map_column(), position, length);
get_nested_column().insert_many_from(*nullable_col.nested_column, position, length);
}

StringRef ColumnNullable::serialize_value_into_arena(size_t n, Arena& arena,
char const*& begin) const {
const auto& arr = get_null_map_data();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>, public N
void insert(const Field& x) override;
void insert_from(const IColumn& src, size_t n) override;

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

template <typename ColumnType>
void insert_from_with_type(const IColumn& src, size_t n) {
const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
Expand Down
22 changes: 22 additions & 0 deletions be/src/vec/columns/column_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,28 @@ void ColumnStr<T>::insert_range_from(const IColumn& src, size_t start, size_t le
}
}

template <typename T>
void ColumnStr<T>::insert_many_from(const IColumn& src, size_t position, size_t length) {
const auto& string_column = assert_cast<const ColumnStr<T>&>(src);
auto [data_val, data_length] = string_column.get_data_at(position);

size_t old_chars_size = chars.size();
check_chars_length(old_chars_size + data_length * length, offsets.size() + length);
chars.resize(old_chars_size + data_length * length);

auto old_size = offsets.size();
offsets.resize(old_size + length);

auto start_pos = old_size;
auto end_pos = old_size + length;
auto prev_pos = old_chars_size;
for (; start_pos < end_pos; ++start_pos) {
memcpy(&chars[prev_pos], data_val, data_length);
offsets[start_pos] = prev_pos + data_length;
prev_pos = prev_pos + data_length;
}
}

template <typename T>
void ColumnStr<T>::insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {
offsets.push_back(new_size);
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

bool is_column_string64() const override { return sizeof(T) == sizeof(uint64_t); }

void insert_from(const IColumn& src_, size_t n) override {
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/columns/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indic
}
}

void ColumnStruct::insert_many_from(const IColumn& src, size_t position, size_t length) {
const auto& src_concrete = assert_cast<const ColumnStruct&>(src);
for (size_t i = 0; i < columns.size(); ++i) {
columns[i]->insert_many_from(src_concrete.get_column(i), position, length);
}
}

void ColumnStruct::insert_range_from(const IColumn& src, size_t start, size_t length) {
const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) override;

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void append_data_by_selector(MutableColumnPtr& res, const Selector& selector) const override {
return append_data_by_selector_impl<ColumnStruct>(res, selector);
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/columns/column_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ size_t ColumnVector<T>::filter(const IColumn::Filter& filter) {
return new_size;
}

template <typename T>
void ColumnVector<T>::insert_many_from(const IColumn& src, size_t position, size_t length) {
auto old_size = data.size();
data.resize(old_size + length);
auto& vals = assert_cast<const Self&>(src).get_data();
std::fill(&data[old_size], &data[old_size + length], vals[position]);
}

template <typename T>
ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation& perm, size_t limit) const {
size_t size = data.size();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ class ColumnVector final : public COWHelper<IColumn, ColumnVector<T>> {
std::fill(data.data() + old_size, data.data() + old_size + n, val);
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void insert_range_of_integer(T begin, T end) {
if constexpr (std::is_integral_v<T>) {
auto old_size = data.size();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vcase_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Status VCaseExpr::open(RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vcast_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exprs/vectorized_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/config.h"
#include "common/consts.h"
#include "common/status.h"
#include "pipeline/pipeline_task.h"
#include "runtime/runtime_state.h"
#include "udf/udf.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -125,7 +126,7 @@ Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "pipeline/pipeline_task.h"
#include "runtime/define_primitive_type.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
Expand Down Expand Up @@ -562,7 +563,7 @@ void VExpr::register_function_context(RuntimeState* state, VExprContext* context
_fn_context_index = context->register_function_context(state, _type, arg_types);
}

Status VExpr::init_function_context(VExprContext* context,
Status VExpr::init_function_context(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope,
const FunctionBasePtr& function) const {
FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
Expand All @@ -574,6 +575,12 @@ Status VExpr::init_function_context(VExprContext* context,
constant_cols.push_back(const_col);
}
fn_ctx->set_constant_cols(constant_cols);
} else {
if (function->is_udf_function()) {
auto* timer = ADD_TIMER(state->get_task()->get_task_profile(),
"UDF[" + function->get_name() + "]");
fn_ctx->set_udf_execute_timer(timer);
}
}

if (scope == FunctionContext::FRAGMENT_LOCAL) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ class VExpr {
/// 1. Set constant columns result of function arguments.
/// 2. Call function's prepare() to initialize function state, fragment-local or
/// thread-local according the input `FunctionStateScope` argument.
Status init_function_context(VExprContext* context, FunctionContext::FunctionStateScope scope,
Status init_function_context(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope,
const FunctionBasePtr& function) const;

/// Helper function to close function context, fragment-local or thread-local according
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vin_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Status VInPredicate::open(RuntimeState* state, VExprContext* context,
for (auto& child : _children) {
RETURN_IF_ERROR(child->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vmatch_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Status VMatchPredicate::open(RuntimeState* state, VExprContext* context,
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::THREAD_LOCAL || scope == FunctionContext::FRAGMENT_LOCAL) {
context->fn_context(_fn_context_index)->set_function_state(scope, _inverted_index_ctx);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/functions/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class IFunctionBase {

virtual bool is_use_default_implementation_for_constants() const = 0;

virtual bool is_udf_function() const { return false; }

/// The property of monotonicity for a certain range.
struct Monotonicity {
bool is_monotonic = false; /// Is the function monotonous (nondecreasing or nonincreasing).
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/functions/function_java_udf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));

if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
SCOPED_TIMER(context->get_udf_execute_timer());
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);

Expand Down Expand Up @@ -96,7 +97,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));

SCOPED_TIMER(context->get_udf_execute_timer());
std::unique_ptr<long[]> input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table));
auto input_table_schema = JniConnector::parse_table_schema(&block, arguments, true);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/functions/function_java_udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class JavaFunctionCall : public IFunctionBase {

bool is_use_default_implementation_for_constants() const override { return true; }

bool is_udf_function() const override { return true; }

private:
const TFunction& fn_;
const DataTypes _argument_types;
Expand Down

0 comments on commit 26beabb

Please sign in to comment.