Skip to content

Commit

Permalink
add flink udf hook
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Jan 12, 2024
1 parent e13e1f3 commit 5378a18
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public Operation createOperation(SqlCommandCall call, FlinkEngineConnContext con
context, call.operands[0], Boolean.parseBoolean(call.operands[1]));
break;
case CREATE_TABLE:
case CREATE_FUNCTION:
case DROP_TABLE:
case ALTER_TABLE:
case CREATE_CATALOG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ private String getExceptionMsg() {
case CREATE_TABLE:
actionMsg = "create a table";
break;
case CREATE_FUNCTION:
actionMsg = "create a function";
break;
case CREATE_DATABASE:
actionMsg = "create a database";
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public enum SqlCommand {

CREATE_DATABASE,

CREATE_FUNCTION,

ALTER_DATABASE,

DROP_DATABASE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ private Optional<SqlCommandCall> parseStmt(String stmt, boolean isBlinkPlanner)
} else if (node instanceof SqlCreateDatabase) {
cmd = SqlCommand.CREATE_DATABASE;
operands = new String[] {stmt};
} else if (node instanceof SqlCreateFunction) {
cmd = SqlCommand.CREATE_FUNCTION;
operands = new String[] {stmt};
} else if (node instanceof SqlDropDatabase) {
cmd = SqlCommand.DROP_DATABASE;
operands = new String[] {stmt};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ wds.linkis.server.version=v1
wds.linkis.engineconn.debug.enable=true
#wds.linkis.keytab.enable=true
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineconnplugin.flink.FlinkEngineConnPlugin
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconnplugin.flink.hook.FlinkJarUdfEngineHook
wds.linkis.engineconn.executor.manager.class=org.apache.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.{
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind
import org.apache.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandParser
import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils
import org.apache.linkis.engineconnplugin.flink.config.{
FlinkEnvConfiguration,
FlinkExecutionTargetType
Expand All @@ -51,22 +50,18 @@ import org.apache.linkis.engineconnplugin.flink.listener.{
}
import org.apache.linkis.engineconnplugin.flink.listener.RowsType.RowsType
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.{
ErrorExecuteResponse,
ExecuteResponse,
SuccessExecuteResponse
}
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.udf.UDFClient

import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase}
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.JobStatus._
import org.apache.flink.configuration.DeploymentOptions
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.flink.table.api.TableResult
import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider
import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.yarn.util.ConverterUtils
Expand Down Expand Up @@ -130,9 +125,6 @@ class FlinkSQLComputationExecutor(
engineExecutionContext: EngineExecutionContext,
code: String
): ExecuteResponse = {
// The load flink udf failure does not affect task execution
Utils.tryAndWarn(loadFlinkUdf(engineExecutionContext))

val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true)
val callSQL =
if (!callOpt.isPresent)
Expand Down Expand Up @@ -202,91 +194,6 @@ class FlinkSQLComputationExecutor(
}
}

private def getExecSqlUser(engineExecutionContext: EngineExecutionContext): String = {
val userCreatorLabel = engineExecutionContext.getLabels
.find(_.isInstanceOf[UserCreatorLabel])
.get
.asInstanceOf[UserCreatorLabel]
userCreatorLabel.getUser
}

private def loadFlinkUdf(engineExecutionContext: EngineExecutionContext) = {
logger.info("Flink start load udf")

val execSqlUser = getExecSqlUser(engineExecutionContext)
val udfAllLoad: String =
engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.all.load", "true").toString
val udfIdStr: String =
engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.custom.ids", "").toString
val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s => s.toLong)

logger.info(s"start loading UDFs, user: $execSqlUser, load all: $udfAllLoad, udfIds: ${udfIds
.mkString("Array(", ", ", ")")}")

val udfInfos =
if (udfAllLoad.toBoolean) UDFClient.getJarUdf(execSqlUser)
else UDFClient.getJarUdfByIds(execSqlUser, udfIds)

if (udfInfos.nonEmpty) {
import scala.util.control.Breaks._
udfInfos.foreach { udfInfo =>
val path: String = udfInfo.getPath
val registerFormat: String = udfInfo.getRegisterFormat

breakable {
if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) {
logger.warn("flink udf udfInfo path or RegisterFormat cannot is empty")
break()
}
logger.info(
s"udfName:${udfInfo.getUdfName}, bml_resource_id:${udfInfo.getBmlResourceId}, bml_id:${udfInfo.getId}\n"
)

val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat)
if (StringUtils.isBlank(udfClassName)) {
logger.warn("flink udf extract Udf Class cannot is empty")
break()
}

FlinkUdfUtils.loadJar(path)

if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) {
logger.warn(
"There is no extends UserDefinedFunction, skip loading flink udf: {} ",
path
)
break()
}

val context = clusterDescriptor.executionContext
val flinkUdfSql: String =
FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName)

FlinkUdfUtils.addFlinkPipelineClasspaths(context.getStreamExecutionEnvironment, path)
val tableEnv = context.getTableEnvironment
logger.info("Flink execute udf sql:{}", flinkUdfSql)
val tableResult: TableResult = tableEnv.executeSql(flinkUdfSql)

var loadUdfLog =
s"udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}, Flink load udf %s ."
if (
tableResult.getResultKind != null && tableResult.getResultKind
.name()
.equalsIgnoreCase("SUCCESS")
) {
loadUdfLog = String.format(loadUdfLog, "success")
logger.info(loadUdfLog)
} else {
loadUdfLog = String.format(loadUdfLog, "failed")
logger.error(loadUdfLog)
}
engineExecutionContext.appendStdout(loadUdfLog)
}
}

}
}

override def executeCompletely(
engineExecutorContext: EngineExecutionContext,
code: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.hook

import org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook
import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils
import org.apache.linkis.manager.label.entity.engine.RunType
import org.apache.linkis.udf.utils.ConstantVar
import org.apache.linkis.udf.vo.UDFInfoVo

import org.apache.commons.lang3.StringUtils

class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook {
override val udfType: BigInt = ConstantVar.UDF_JAR
override val category: String = ConstantVar.UDF
override val runType = RunType.SQL

override protected def constructCode(udfInfo: UDFInfoVo): String = {
val path: String = udfInfo.getPath
val registerFormat: String = udfInfo.getRegisterFormat

if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) {
logger.warn("Flink udfInfo path or registerFormat cannot is empty")
return ""
}

val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat)
if (StringUtils.isBlank(udfClassName)) {
logger.warn("Flink extract udf class name cannot is empty")
return ""
}

FlinkUdfUtils.loadJar(path)

if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) {
logger.warn(
"There is no extends Flink UserDefinedFunction, skip loading flink udf: {} ",
path
)
return ""
}

val flinkUdfSql: String =
FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName)

logger.info(
s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}\n"
)

"%sql\n" + flinkUdfSql
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.flink;

import org.apache.flink.table.functions.ScalarFunction;

public class LinkisFlinkUdfExample extends ScalarFunction {
public String eval(String str) {
return String.format("linkis flink udf test: %s", str);
}
}

0 comments on commit 5378a18

Please sign in to comment.