Skip to content

Commit

Permalink
[Fix] Fix pyflink yarn remote mode bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Nov 13, 2023
1 parent 1ced87e commit e905c7c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ case class SubmitRequest(
lazy val effectiveAppName: String =
if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else this.appName

lazy val classPaths: List[URL] = {
lazy val libs: List[URL] = {
val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
val lib = Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
flinkVersion.flinkLibs ++ lib
Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
}

lazy val classPaths: List[URL] = flinkVersion.flinkLibs ++ libs

lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString

lazy val allowNonRestoredState: Boolean = Try(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull

import java.io.File
import java.util.{Collections, List => JavaList, Map => JavaMap}

import scala.collection.convert.ImplicitConversions._
Expand Down Expand Up @@ -255,7 +254,6 @@ trait FlinkClientTrait extends Logger {
flinkConfig: Configuration): (PackagedProgram, JobGraph) = {

val pkgBuilder = PackagedProgram.newBuilder
.setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
)
Expand All @@ -272,23 +270,20 @@ trait FlinkClientTrait extends Logger {
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
// including $app/lib
val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
val localLibUrl = new File(localLib).listFiles().map(_.toURI.toURL).toList
pkgBuilder.setUserClassPaths(
Lists.newArrayList(localLibUrl: _*)
)
}
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
if (submitRequest.libs.nonEmpty) {
pkgBuilder.setUserClassPaths(submitRequest.libs)
}
case _ =>
pkgBuilder.setJarFile(submitRequest.userJarFile)
pkgBuilder
.setUserClassPaths(submitRequest.classPaths)
.setJarFile(submitRequest.userJarFile)
}

val packageProgram = pkgBuilder.build()
Expand Down

0 comments on commit e905c7c

Please sign in to comment.