Skip to content

Commit

Permalink
[Improve] submit flink job userclassPaths improvement (#3313)
Browse files Browse the repository at this point in the history
* [Improve] submit flink job userclassPaths improvement

* userClasspath minor improvement
  • Loading branch information
wolfboys authored Nov 5, 2023
1 parent 3c8bd0b commit 5b727cc
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.streampark.flink.client.bean

import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace}
import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums._
import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils}
Expand All @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointResto
import javax.annotation.Nullable

import java.io.File
import java.net.URL
import java.util.{Map => JavaMap}

import scala.collection.convert.ImplicitConversions._
Expand All @@ -55,7 +56,8 @@ case class SubmitRequest(
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {

lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
private[this] lazy val appProperties: Map[String, String] = getParameterMap(
KEY_FLINK_PROPERTY_PREFIX)

lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX)

Expand All @@ -68,9 +70,15 @@ case class SubmitRequest(
lazy val effectiveAppName: String =
if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else this.appName

lazy val classPaths: List[URL] = {
val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
val lib = Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
flinkVersion.flinkLibs ++ lib
}

lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString

lazy val allowNonRestoredState = Try(
lazy val allowNonRestoredState: Boolean = Try(
properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean)
.getOrElse(false)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ object YarnApplicationClient extends YarnClientTrait {
if (!FsOperator.hdfs.exists(pyVenv)) {
throw new RuntimeException(s"$pyVenv File does not exist")
}

// including $app/lib
includingPipelineJars(submitRequest, flinkConfig)

// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
import org.apache.streampark.common.enums._
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util._
import org.apache.streampark.flink.client.bean._
Expand All @@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull

import java.io.File
import java.net.URL
import java.util
import java.util.{Collections, List => JavaList, Map => JavaMap}

Expand Down Expand Up @@ -256,9 +257,7 @@ trait FlinkClientTrait extends Logger {
flinkConfig: Configuration): (PackagedProgram, JobGraph) = {

val pkgBuilder = PackagedProgram.newBuilder
.setUserClassPaths(
Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
)
.setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
)
Expand All @@ -275,14 +274,6 @@ trait FlinkClientTrait extends Logger {
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
// including $app/lib
val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
val localLibUrl = new File(localLib).listFiles().map(_.toURI.toURL).toList
pkgBuilder.setUserClassPaths(
Lists.newArrayList(localLibUrl: _*)
)
}
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
Expand Down Expand Up @@ -600,13 +591,4 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath, savepointRequest.nativeFormat).get()
}

private[client] def includingPipelineJars(
submitRequest: SubmitRequest,
flinkConfig: Configuration) = {
val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object FlinkSqlValidator extends Logger {
var hasInsert = false
for (call <- sqlCommands) {
val args = call.operands.head
lazy val command = call.command
val command = call.command
command match {
case SET | RESET =>
if (command == SET && args == TableConfigOptions.TABLE_SQL_DIALECT.key()) {
Expand Down

0 comments on commit 5b727cc

Please sign in to comment.