From fe88b073e77b5e65439150979b60e4043fd0a46e Mon Sep 17 00:00:00 2001
From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com>
Date: Wed, 16 Oct 2024 16:42:26 +0800
Subject: [PATCH] [improve](profile) add timer for record udf execute time
(#41779)
## Proposed changes
---
be/src/pipeline/pipeline_task.h | 2 ++
be/src/udf/udf.h | 9 +++++++++
be/src/vec/exprs/vcase_expr.cpp | 2 +-
be/src/vec/exprs/vcast_expr.cpp | 2 +-
be/src/vec/exprs/vectorized_fn_call.cpp | 3 ++-
be/src/vec/exprs/vexpr.cpp | 9 ++++++++-
be/src/vec/exprs/vexpr.h | 3 ++-
be/src/vec/exprs/vin_predicate.cpp | 2 +-
be/src/vec/exprs/vmatch_predicate.cpp | 2 +-
be/src/vec/functions/function.h | 2 ++
be/src/vec/functions/function_java_udf.cpp | 3 ++-
be/src/vec/functions/function_java_udf.h | 2 ++
12 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 223420ea55aff1f..febc9634c49f237 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -224,6 +224,8 @@ class PipelineTask {
RuntimeState* runtime_state() const { return _state; }
+ RuntimeProfile* get_task_profile() const { return _task_profile.get(); }
+
std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }
void stop_if_finished() {
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 39af2ad1c25c130..d717c18ccec64a4 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -26,6 +26,7 @@
#include
#include "runtime/types.h"
+#include "util/runtime_profile.h"
#include "vec/common/arena.h"
namespace doris {
@@ -88,6 +89,12 @@ class FunctionContext {
_jsonb_string_as_string = jsonb_string_as_string;
}
+ void set_udf_execute_timer(RuntimeProfile::Counter* udf_execute_timer) {
+ _udf_execute_timer = udf_execute_timer;
+ }
+
+ RuntimeProfile::Counter* get_udf_execute_timer() { return _udf_execute_timer; }
+
// Cast flag, when enable string_as_jsonb_string, string casting to jsonb will not parse string
// instead just insert a string literal
bool string_as_jsonb_string() const { return _string_as_jsonb_string; }
@@ -176,6 +183,8 @@ class FunctionContext {
std::vector> _constant_cols;
+ //udf execute timer
+ RuntimeProfile::Counter* _udf_execute_timer = nullptr;
bool _check_overflow_for_decimal = false;
bool _string_as_jsonb_string = false;
diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp
index f5fe920d931a7a7..222a8f5629af778 100644
--- a/be/src/vec/exprs/vcase_expr.cpp
+++ b/be/src/vec/exprs/vcase_expr.cpp
@@ -86,7 +86,7 @@ Status VCaseExpr::open(RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
- RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+ RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index 7b69690385bb92e..38f861add87224d 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -89,7 +89,7 @@ doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
- RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+ RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp
index 48582d83cbac76c..3192653a816f2c5 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -27,6 +27,7 @@
#include "common/config.h"
#include "common/consts.h"
#include "common/status.h"
+#include "pipeline/pipeline_task.h"
#include "runtime/runtime_state.h"
#include "udf/udf.h"
#include "vec/columns/column.h"
@@ -125,7 +126,7 @@ Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
- RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+ RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 13f67bc2eaf9aac..7e77d92c3106b60 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
+#include "pipeline/pipeline_task.h"
#include "runtime/define_primitive_type.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
@@ -562,7 +563,7 @@ void VExpr::register_function_context(RuntimeState* state, VExprContext* context
_fn_context_index = context->register_function_context(state, _type, arg_types);
}
-Status VExpr::init_function_context(VExprContext* context,
+Status VExpr::init_function_context(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope,
const FunctionBasePtr& function) const {
FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
@@ -574,6 +575,12 @@ Status VExpr::init_function_context(VExprContext* context,
constant_cols.push_back(const_col);
}
fn_ctx->set_constant_cols(constant_cols);
+ } else {
+ if (function->is_udf_function()) {
+ auto* timer = ADD_TIMER(state->get_task()->get_task_profile(),
+ "UDF[" + function->get_name() + "]");
+ fn_ctx->set_udf_execute_timer(timer);
+ }
}
if (scope == FunctionContext::FRAGMENT_LOCAL) {
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 558b4f57eabe0ac..142ab6a27d2b5e0 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -286,7 +286,8 @@ class VExpr {
/// 1. Set constant columns result of function arguments.
/// 2. Call function's prepare() to initialize function state, fragment-local or
/// thread-local according the input `FunctionStateScope` argument.
- Status init_function_context(VExprContext* context, FunctionContext::FunctionStateScope scope,
+ Status init_function_context(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope,
const FunctionBasePtr& function) const;
/// Helper function to close function context, fragment-local or thread-local according
diff --git a/be/src/vec/exprs/vin_predicate.cpp b/be/src/vec/exprs/vin_predicate.cpp
index 2b95cc921233bd9..179a30971eabb8e 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -91,7 +91,7 @@ Status VInPredicate::open(RuntimeState* state, VExprContext* context,
for (auto& child : _children) {
RETURN_IF_ERROR(child->open(state, context, scope));
}
- RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+ RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
diff --git a/be/src/vec/exprs/vmatch_predicate.cpp b/be/src/vec/exprs/vmatch_predicate.cpp
index 813e7ba24ea9198..c80933df13c0bd4 100644
--- a/be/src/vec/exprs/vmatch_predicate.cpp
+++ b/be/src/vec/exprs/vmatch_predicate.cpp
@@ -109,7 +109,7 @@ Status VMatchPredicate::open(RuntimeState* state, VExprContext* context,
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
}
- RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+ RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::THREAD_LOCAL || scope == FunctionContext::FRAGMENT_LOCAL) {
context->fn_context(_fn_context_index)->set_function_state(scope, _inverted_index_ctx);
}
diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h
index f618cc9a1b0ffb6..4702a4b7af0bbf2 100644
--- a/be/src/vec/functions/function.h
+++ b/be/src/vec/functions/function.h
@@ -210,6 +210,8 @@ class IFunctionBase {
virtual bool is_use_default_implementation_for_constants() const = 0;
+ virtual bool is_udf_function() const { return false; }
+
/// The property of monotonicity for a certain range.
struct Monotonicity {
bool is_monotonic = false; /// Is the function monotonous (nondecreasing or nonincreasing).
diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp
index 63985f1964c42ea..e2c441b660201d6 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -44,6 +44,7 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
+ SCOPED_TIMER(context->get_udf_execute_timer());
std::shared_ptr jni_ctx = std::make_shared();
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
@@ -96,7 +97,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
JniContext* jni_ctx = reinterpret_cast(
context->get_function_state(FunctionContext::THREAD_LOCAL));
-
+ SCOPED_TIMER(context->get_udf_execute_timer());
std::unique_ptr input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table));
auto input_table_schema = JniConnector::parse_table_schema(&block, arguments, true);
diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h
index e507392184f85dc..e35fc67881acb5c 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -107,6 +107,8 @@ class JavaFunctionCall : public IFunctionBase {
bool is_use_default_implementation_for_constants() const override { return true; }
+ bool is_udf_function() const override { return true; }
+
private:
const TFunction& fn_;
const DataTypes _argument_types;