diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala index fec2fe5e7b..c072c32794 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala @@ -118,4 +118,10 @@ object ComputationExecutorConf { val TASK_SUBMIT_WAIT_TIME_MS = CommonVars("linkis.ec.task.submit.wait.time.ms", 2L, "Task submit wait time(ms)").getValue + val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED = + CommonVars("linkis.ec.send.log.entrance.limit.enabled", true) + + val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH = + CommonVars("linkis.ec.send.log.entrance.limit.length", 2000) + } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala index 7367dd5330..377c32c193 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala @@ -193,8 +193,16 @@ class EngineExecutionContext(executor: ComputationExecutor, executorUser: String def appendStdout(log: String): Unit = if (executor.isInternalExecute) { logger.info(log) } else { + var taskLog = log + if ( + ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue && + log.length > ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue + ) { + taskLog = + s"${log.substring(0, ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue)}..." + } val listenerBus = getEngineSyncListenerBus - getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, log))) + getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, taskLog))) } override def close(): Unit = { diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java index 4e4654f923..24d3ddcb2c 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java @@ -47,6 +47,7 @@ public class SparkConfig { private String k8sSparkVersion; private String k8sNamespace; + private String k8sFileUploadPath; private String deployMode = "client"; // ("client") // todo cluster private String appResource; // ("") private String appName; // ("") @@ -73,6 +74,14 @@ public class SparkConfig { private String keytab; // ("--keytab", "") private String queue; // ("--queue", "") + public String getK8sFileUploadPath() { + return k8sFileUploadPath; + } + + public void setK8sFileUploadPath(String k8sFileUploadPath) { + this.k8sFileUploadPath = k8sFileUploadPath; + } + public String getK8sImagePullPolicy() { return k8sImagePullPolicy; } @@ -421,6 +430,9 @@ public String toString() { + ", k8sSparkVersion='" + k8sSparkVersion + '\'' + + ", k8sFileUploadPath='" + + k8sFileUploadPath + + '\'' + ", k8sNamespace='" + k8sNamespace + '\'' diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java index 2f0318254e..fa6236600e 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java @@ -21,11 +21,13 @@ import org.apache.linkis.engineplugin.spark.client.context.SparkConfig; import org.apache.linkis.engineplugin.spark.client.deployment.crds.*; import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper; +import org.apache.linkis.engineplugin.spark.config.SparkConfiguration; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.launcher.SparkAppHandle; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -79,6 +81,7 @@ public void deployCluster(String mainClass, String args, Map con NonNamespaceOperation> sparkApplicationClient = getSparkApplicationClient(client); + SparkApplication sparkApplication = getSparkApplication(sparkConfig.getAppName(), sparkConfig.getK8sNamespace()); @@ -88,12 +91,19 @@ public void deployCluster(String mainClass, String args, Map con .memory(sparkConfig.getDriverMemory()) .serviceAccount(sparkConfig.getK8sServiceAccount()) .build(); + SparkPodSpec executor = SparkPodSpec.Builder() .cores(sparkConfig.getExecutorCores()) .instances(sparkConfig.getNumExecutors()) .memory(sparkConfig.getExecutorMemory()) .build(); + + Map sparkConfMap = new HashMap<>(); + sparkConfMap.put( + SparkConfiguration.SPARK_KUBERNETES_FILE_UPLOAD_PATH().key(), + sparkConfig.getK8sFileUploadPath()); + SparkApplicationSpec sparkApplicationSpec = SparkApplicationSpec.Builder() .type(sparkConfig.getK8sLanguageType()) @@ -107,10 +117,12 @@ public void deployCluster(String mainClass, String args, Map con .restartPolicy(new RestartPolicy(sparkConfig.getK8sRestartPolicy())) .driver(driver) .executor(executor) + .sparkConf(sparkConfMap) .build(); logger.info("Spark k8s operator task parameters: {}", sparkApplicationSpec); sparkApplication.setSpec(sparkApplicationSpec); + SparkApplication created = sparkApplicationClient.createOrReplace(sparkApplication); logger.info("Preparing to submit the Spark k8s operator Task: {}", created); diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java index 80c03f2df1..be705ce400 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java @@ -17,7 +17,9 @@ package org.apache.linkis.engineplugin.spark.client.deployment.crds; +import java.util.HashMap; import java.util.List; +import java.util.Map; import io.fabric8.kubernetes.api.model.KubernetesResource; @@ -45,6 +47,16 @@ public class SparkApplicationSpec implements KubernetesResource { private SparkPodSpec executor; + private Map sparkConf; + + public Map getSparkConf() { + return sparkConf; + } + + public void setSparkConf(Map sparkConf) { + this.sparkConf = sparkConf; + } + public String getType() { return type; } @@ -165,6 +177,8 @@ public String toString() { + driver + ", executor=" + executor + + ", sparkConf=" + + sparkConf + '}'; } @@ -185,6 +199,8 @@ public static class SparkApplicationSpecBuilder { private SparkPodSpec driver; private SparkPodSpec executor; + private Map sparkConf; + private SparkApplicationSpecBuilder() {} public SparkApplicationSpecBuilder type(String type) { @@ -242,6 +258,22 @@ public SparkApplicationSpecBuilder executor(SparkPodSpec executor) { return this; } + public SparkApplicationSpecBuilder sparkConf(Map sparkConf) { + if (sparkConf == null || sparkConf.size() == 0) { + return this; + } + + if (this.sparkConf == null) { + this.sparkConf = new HashMap<>(); + } + + for (Map.Entry entry : sparkConf.entrySet()) { + this.sparkConf.put(entry.getKey(), entry.getValue()); + } + + return this; + } + public SparkApplicationSpec build() { SparkApplicationSpec sparkApplicationSpec = new SparkApplicationSpec(); sparkApplicationSpec.type = this.type; @@ -255,6 +287,7 @@ public SparkApplicationSpec build() { sparkApplicationSpec.executor = this.executor; sparkApplicationSpec.image = this.image; sparkApplicationSpec.restartPolicy = this.restartPolicy; + sparkApplicationSpec.sparkConf = this.sparkConf; return sparkApplicationSpec; } } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index b81b21e76f..ecc37597db 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -62,6 +62,9 @@ object SparkConfiguration extends Logging { val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1") val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default") + val SPARK_KUBERNETES_FILE_UPLOAD_PATH = + CommonVars[String]("spark.kubernetes.file.upload.path", "local:///opt/spark/tmp") + val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python") val SPARK_PYTHON_TEST_MODE_ENABLE = diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index fef3f06999..5bf90c6bfe 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -104,6 +104,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setK8sPassword(SPARK_K8S_PASSWORD.getValue(options)) sparkConfig.setK8sImage(SPARK_K8S_IMAGE.getValue(options)) sparkConfig.setK8sNamespace(SPARK_K8S_NAMESPACE.getValue(options)) + sparkConfig.setK8sFileUploadPath(SPARK_KUBERNETES_FILE_UPLOAD_PATH.getValue(options)) sparkConfig.setK8sSparkVersion(SPARK_K8S_SPARK_VERSION.getValue(options)) sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options)) sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))