Skip to content

Commit

Permalink
Flink supports udf function
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Jan 11, 2024
1 parent be7b820 commit e13e1f3
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
public static final long CLIENT_REQUEST_TIMEOUT =
FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue().toLong();

protected final ExecutionContext executionContext;
public final ExecutionContext executionContext;
// jobId is not null only after job is submitted
private JobID jobId;
protected ApplicationId clusterID;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.client.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkUdfUtils {

private static final Logger logger = LoggerFactory.getLogger(FlinkUdfUtils.class);

private static final String CREATE_TEMP_FUNCTION_PATTERN =
"create\\s+temporary\\s+function\\s+(\\w+)\\s+as\\s+\"(.*?)\"";

private static final String CREATE_TEMP_FUNCTION_SQL =
"CREATE TEMPORARY FUNCTION IF NOT EXISTS %s AS '%s' ";

public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment env, String path) {
logger.info("Flink udf start add pipeline classpaths, jar path: {}", path);

try {
Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration conf = (Configuration) configuration.get(env);

Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String, Object> map = (Map<String, Object>) confData.get(conf);
List<String> jarList = new ArrayList<>();
jarList.add(path);
map.put(PipelineOptions.CLASSPATHS.key(), jarList);
} catch (Exception e) {
logger.warn("Flink udf add pipeline classpaths failed", e);
}
}

public static void loadJar(String jarPath) {
logger.info("Flink udf URLClassLoader start loadJar: {}", jarPath);

Method method = null;
Boolean accessible = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
accessible = method.isAccessible();

if (accessible == false) {
method.setAccessible(true);
}
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
method.invoke(classLoader, new URL(jarPath));

} catch (Exception e) {
logger.warn("Flink udf URLClassLoader loadJar failed", e);
} finally {
if (accessible != null) {
method.setAccessible(accessible);
}
}
}

public static String extractUdfClass(String statement) {
Pattern pattern = Pattern.compile(CREATE_TEMP_FUNCTION_PATTERN);
Matcher matcher = pattern.matcher(statement);
if (matcher.find() && matcher.groupCount() >= 2) {
return matcher.group(2);
}
return "";
}

public static boolean isFlinkUdf(ClassLoader classLoader, String className) {
try {
Class<?> udfClass = classLoader.loadClass(className);
if (UserDefinedFunction.class.isAssignableFrom(udfClass)) {
return true;
}

} catch (ClassNotFoundException e) {
logger.warn("flink udf load isFlinkUdf failed, ClassNotFoundException: {}", className);
}
return false;
}

public static String generateFlinkUdfSql(String name, String className) {
return String.format(CREATE_TEMP_FUNCTION_SQL, name, className);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.{
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind
import org.apache.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandParser
import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils
import org.apache.linkis.engineconnplugin.flink.config.{
FlinkEnvConfiguration,
FlinkExecutionTargetType
Expand All @@ -50,18 +51,22 @@ import org.apache.linkis.engineconnplugin.flink.listener.{
}
import org.apache.linkis.engineconnplugin.flink.listener.RowsType.RowsType
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.{
ErrorExecuteResponse,
ExecuteResponse,
SuccessExecuteResponse
}
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.udf.UDFClient

import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase}
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.JobStatus._
import org.apache.flink.configuration.DeploymentOptions
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.flink.table.api.TableResult
import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider
import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.yarn.util.ConverterUtils
Expand Down Expand Up @@ -125,6 +130,9 @@ class FlinkSQLComputationExecutor(
engineExecutionContext: EngineExecutionContext,
code: String
): ExecuteResponse = {
// The load flink udf failure does not affect task execution
Utils.tryAndWarn(loadFlinkUdf(engineExecutionContext))

val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true)
val callSQL =
if (!callOpt.isPresent)
Expand Down Expand Up @@ -194,6 +202,91 @@ class FlinkSQLComputationExecutor(
}
}

private def getExecSqlUser(engineExecutionContext: EngineExecutionContext): String = {
val userCreatorLabel = engineExecutionContext.getLabels
.find(_.isInstanceOf[UserCreatorLabel])
.get
.asInstanceOf[UserCreatorLabel]
userCreatorLabel.getUser
}

private def loadFlinkUdf(engineExecutionContext: EngineExecutionContext) = {
logger.info("Flink start load udf")

val execSqlUser = getExecSqlUser(engineExecutionContext)
val udfAllLoad: String =
engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.all.load", "true").toString
val udfIdStr: String =
engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.custom.ids", "").toString
val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s => s.toLong)

logger.info(s"start loading UDFs, user: $execSqlUser, load all: $udfAllLoad, udfIds: ${udfIds
.mkString("Array(", ", ", ")")}")

val udfInfos =
if (udfAllLoad.toBoolean) UDFClient.getJarUdf(execSqlUser)
else UDFClient.getJarUdfByIds(execSqlUser, udfIds)

if (udfInfos.nonEmpty) {
import scala.util.control.Breaks._
udfInfos.foreach { udfInfo =>
val path: String = udfInfo.getPath
val registerFormat: String = udfInfo.getRegisterFormat

breakable {
if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) {
logger.warn("flink udf udfInfo path or RegisterFormat cannot is empty")
break()
}
logger.info(
s"udfName:${udfInfo.getUdfName}, bml_resource_id:${udfInfo.getBmlResourceId}, bml_id:${udfInfo.getId}\n"
)

val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat)
if (StringUtils.isBlank(udfClassName)) {
logger.warn("flink udf extract Udf Class cannot is empty")
break()
}

FlinkUdfUtils.loadJar(path)

if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) {
logger.warn(
"There is no extends UserDefinedFunction, skip loading flink udf: {} ",
path
)
break()
}

val context = clusterDescriptor.executionContext
val flinkUdfSql: String =
FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName)

FlinkUdfUtils.addFlinkPipelineClasspaths(context.getStreamExecutionEnvironment, path)
val tableEnv = context.getTableEnvironment
logger.info("Flink execute udf sql:{}", flinkUdfSql)
val tableResult: TableResult = tableEnv.executeSql(flinkUdfSql)

var loadUdfLog =
s"udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}, Flink load udf %s ."
if (
tableResult.getResultKind != null && tableResult.getResultKind
.name()
.equalsIgnoreCase("SUCCESS")
) {
loadUdfLog = String.format(loadUdfLog, "success")
logger.info(loadUdfLog)
} else {
loadUdfLog = String.format(loadUdfLog, "failed")
logger.error(loadUdfLog)
}
engineExecutionContext.appendStdout(loadUdfLog)
}
}

}
}

override def executeCompletely(
engineExecutorContext: EngineExecutionContext,
code: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta
addPathToClassPath,
CLASS_PATH_SEPARATOR
}
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.manager.label.entity.engine.{EngineConnMode, UserCreatorLabel}
import org.apache.linkis.manager.label.utils.LabelUtil

import java.util

import scala.collection.JavaConverters._

import com.google.common.collect.Lists

class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {

override protected def getCommands(implicit
Expand Down Expand Up @@ -136,4 +139,17 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {

override protected def ifAddHiveConfigPath: Boolean = true

override protected def getEngineConnManagerHooks(implicit
engineConnBuildRequest: EngineConnBuildRequest
): java.util.List[String] = if (isOnceMode) {
super.getEngineConnManagerHooks(engineConnBuildRequest)
} else {
Lists.newArrayList("JarUDFLoadECMHook")
}

def isOnceMode: Boolean = {
val engineConnMode = LabelUtil.getEngineConnMode(engineConnBuildRequest.labels)
EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once
}

}

0 comments on commit e13e1f3

Please sign in to comment.