diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java index a5ac102033..594f8dd98d 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java @@ -51,7 +51,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable { public static final long CLIENT_REQUEST_TIMEOUT = FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue().toLong(); - protected final ExecutionContext executionContext; + public final ExecutionContext executionContext; // jobId is not null only after job is submitted private JobID jobId; protected ApplicationId clusterID; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java new file mode 100644 index 0000000000..9df14197d0 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.functions.UserDefinedFunction; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkUdfUtils { + + private static final Logger logger = LoggerFactory.getLogger(FlinkUdfUtils.class); + + private static final String CREATE_TEMP_FUNCTION_PATTERN = + "create\\s+temporary\\s+function\\s+(\\w+)\\s+as\\s+\"(.*?)\""; + + private static final String CREATE_TEMP_FUNCTION_SQL = + "CREATE TEMPORARY FUNCTION IF NOT EXISTS %s AS '%s' "; + + public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment env, String path) { + logger.info("Flink udf start add pipeline classpaths, jar path: {}", path); + + try { + Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configuration.setAccessible(true); + Configuration conf = (Configuration) configuration.get(env); + + Field confData = Configuration.class.getDeclaredField("confData"); + confData.setAccessible(true); + Map map = (Map) confData.get(conf); + List jarList = new ArrayList<>(); + jarList.add(path); + map.put(PipelineOptions.CLASSPATHS.key(), jarList); + } catch (Exception e) { + logger.warn("Flink udf add pipeline classpaths failed", e); + } + } + + public static void loadJar(String jarPath) { + logger.info("Flink udf URLClassLoader start loadJar: {}", jarPath); + + Method method = null; + Boolean accessible = null; + try { + method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); + accessible = method.isAccessible(); + + if (accessible == false) { + method.setAccessible(true); + } + URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); + method.invoke(classLoader, new URL(jarPath)); + + } catch (Exception e) { + logger.warn("Flink udf URLClassLoader loadJar failed", e); + } finally { + if (accessible != null) { + method.setAccessible(accessible); + } + } + } + + public static String extractUdfClass(String statement) { + Pattern pattern = Pattern.compile(CREATE_TEMP_FUNCTION_PATTERN); + Matcher matcher = pattern.matcher(statement); + if (matcher.find() && matcher.groupCount() >= 2) { + return matcher.group(2); + } + return ""; + } + + public static boolean isFlinkUdf(ClassLoader classLoader, String className) { + try { + Class udfClass = classLoader.loadClass(className); + if (UserDefinedFunction.class.isAssignableFrom(udfClass)) { + return true; + } + + } catch (ClassNotFoundException e) { + logger.warn("flink udf load isFlinkUdf failed, ClassNotFoundException: {}", className); + } + return false; + } + + public static String generateFlinkUdfSql(String name, String className) { + return String.format(CREATE_TEMP_FUNCTION_SQL, name, className); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 60f1d9088b..09d28cfd57 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -39,6 +39,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.{ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind import org.apache.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandParser +import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils import org.apache.linkis.engineconnplugin.flink.config.{ FlinkEnvConfiguration, FlinkExecutionTargetType @@ -50,6 +51,7 @@ import org.apache.linkis.engineconnplugin.flink.listener.{ } import org.apache.linkis.engineconnplugin.flink.listener.RowsType.RowsType import org.apache.linkis.governance.common.paser.SQLCodeParser +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.scheduler.executer.{ ErrorExecuteResponse, @@ -57,11 +59,14 @@ import org.apache.linkis.scheduler.executer.{ SuccessExecuteResponse } import org.apache.linkis.storage.resultset.ResultSetFactory +import org.apache.linkis.udf.UDFClient import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase} +import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.JobStatus._ import org.apache.flink.configuration.DeploymentOptions import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions +import org.apache.flink.table.api.TableResult import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.yarn.util.ConverterUtils @@ -125,6 +130,9 @@ class FlinkSQLComputationExecutor( engineExecutionContext: EngineExecutionContext, code: String ): ExecuteResponse = { + // The load flink udf failure does not affect task execution + Utils.tryAndWarn(loadFlinkUdf(engineExecutionContext)) + val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true) val callSQL = if (!callOpt.isPresent) @@ -194,6 +202,91 @@ class FlinkSQLComputationExecutor( } } + private def getExecSqlUser(engineExecutionContext: EngineExecutionContext): String = { + val userCreatorLabel = engineExecutionContext.getLabels + .find(_.isInstanceOf[UserCreatorLabel]) + .get + .asInstanceOf[UserCreatorLabel] + userCreatorLabel.getUser + } + + private def loadFlinkUdf(engineExecutionContext: EngineExecutionContext) = { + logger.info("Flink start load udf") + + val execSqlUser = getExecSqlUser(engineExecutionContext) + val udfAllLoad: String = + engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.all.load", "true").toString + val udfIdStr: String = + engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.custom.ids", "").toString + val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s => s.toLong) + + logger.info(s"start loading UDFs, user: $execSqlUser, load all: $udfAllLoad, udfIds: ${udfIds + .mkString("Array(", ", ", ")")}") + + val udfInfos = + if (udfAllLoad.toBoolean) UDFClient.getJarUdf(execSqlUser) + else UDFClient.getJarUdfByIds(execSqlUser, udfIds) + + if (udfInfos.nonEmpty) { + import scala.util.control.Breaks._ + udfInfos.foreach { udfInfo => + val path: String = udfInfo.getPath + val registerFormat: String = udfInfo.getRegisterFormat + + breakable { + if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) { + logger.warn("flink udf udfInfo path or RegisterFormat cannot is empty") + break() + } + logger.info( + s"udfName:${udfInfo.getUdfName}, bml_resource_id:${udfInfo.getBmlResourceId}, bml_id:${udfInfo.getId}\n" + ) + + val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat) + if (StringUtils.isBlank(udfClassName)) { + logger.warn("flink udf extract Udf Class cannot is empty") + break() + } + + FlinkUdfUtils.loadJar(path) + + if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) { + logger.warn( + "There is no extends UserDefinedFunction, skip loading flink udf: {} ", + path + ) + break() + } + + val context = clusterDescriptor.executionContext + val flinkUdfSql: String = + FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName) + + FlinkUdfUtils.addFlinkPipelineClasspaths(context.getStreamExecutionEnvironment, path) + val tableEnv = context.getTableEnvironment + logger.info("Flink execute udf sql:{}", flinkUdfSql) + val tableResult: TableResult = tableEnv.executeSql(flinkUdfSql) + + var loadUdfLog = + s"udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}, Flink load udf %s ." + if ( + tableResult.getResultKind != null && tableResult.getResultKind + .name() + .equalsIgnoreCase("SUCCESS") + ) { + loadUdfLog = String.format(loadUdfLog, "success") + logger.info(loadUdfLog) + } else { + loadUdfLog = String.format(loadUdfLog, "failed") + logger.error(loadUdfLog) + } + engineExecutionContext.appendStdout(loadUdfLog) + } + } + + } + } + override def executeCompletely( engineExecutorContext: EngineExecutionContext, code: String, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 70b3ad1b20..13a5bae4d5 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -37,12 +37,15 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta addPathToClassPath, CLASS_PATH_SEPARATOR } -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import org.apache.linkis.manager.label.entity.engine.{EngineConnMode, UserCreatorLabel} +import org.apache.linkis.manager.label.utils.LabelUtil import java.util import scala.collection.JavaConverters._ +import com.google.common.collect.Lists + class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def getCommands(implicit @@ -136,4 +139,17 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def ifAddHiveConfigPath: Boolean = true + override protected def getEngineConnManagerHooks(implicit + engineConnBuildRequest: EngineConnBuildRequest + ): java.util.List[String] = if (isOnceMode) { + super.getEngineConnManagerHooks(engineConnBuildRequest) + } else { + Lists.newArrayList("JarUDFLoadECMHook") + } + + def isOnceMode: Boolean = { + val engineConnMode = LabelUtil.getEngineConnMode(engineConnBuildRequest.labels) + EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once + } + }