Skip to content

Commit

Permalink
[Bug] extract program arguments bug fixed. (#3297)
Browse files Browse the repository at this point in the history
* [Improve] packageArgs improvement

* minor improvement

* [Improve] minor improve

* [Improve] method extractArguments improvement

* [Improve] method extractProgramArgs improvement

* [Improve] delete|update syntax support

* [Improve] delete|update syntax minor improvement

* [improve] jobGraph submit improvement

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Oct 30, 2023
1 parent ba269cb commit 4dc5eb7
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg
lib
}

lazy val flinkLibs: List[NetURL] = flinkLib.listFiles().map(_.toURI.toURL).toList

lazy val version: String = {
val flinkVersion = new AtomicReference[String]
val cmd = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
import scala.collection.mutable.{Map => MutableMap}
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}

object PropertiesUtils extends Logger {

Expand Down Expand Up @@ -305,6 +305,30 @@ object PropertiesUtils extends Logger {
}
}

@Nonnull def extractArguments(args: String): List[String] = {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(args)) {
val array = args.split("\\s+")
val iter = array.iterator
while (iter.hasNext) {
val v = iter.next()
val p = v.take(1)
p match {
case "'" | "\"" =>
var value = v
if (!v.endsWith(p)) {
while (!value.endsWith(p) && iter.hasNext) {
value += s" ${iter.next()}"
}
}
programArgs += value.substring(1, value.length - 1)
case _ => programArgs += v
}
}
}
programArgs.toList
}

@Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] =
new JavaMap[String, String](extractDynamicProperties(properties).asJava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,83 +16,32 @@
*/
package org.apache.streampark.common.util

import org.apache.commons.lang3.StringUtils
import org.junit.jupiter.api.{Assertions, Test}

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.language.postfixOps

class PropertiesUtilsTestCase {

@Test def testExtractProgramArgs(): Unit = {
val argsStr = "--host localhost:8123\n" +
"--sql \"insert into table_a select * from table_b\"\n" +
"--c d\r\n" +
"--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n"
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotBlank(argsStr)) {
val multiChar = "\""
val array = argsStr.split("\\s+")
if (!array.exists(_.startsWith(multiChar))) {
array.foreach(
x => {
if (x.trim.nonEmpty) {
programArgs += x
}
})
} else {
val argsArray = new ArrayBuffer[String]()
val tempBuffer = new ArrayBuffer[String]()

@tailrec
def processElement(index: Int, multi: Boolean): Unit = {

if (index == array.length) {
if (tempBuffer.nonEmpty) {
argsArray += tempBuffer.mkString(" ")
}
return
}

val next = index + 1
val elem = array(index).trim
val until = if (elem.endsWith(multiChar)) 1 else 0

if (elem.isEmpty) {
processElement(next, multi = false)
} else {
if (multi) {
if (elem.endsWith(multiChar)) {
tempBuffer += elem.dropRight(1)
argsArray += tempBuffer.mkString(" ")
tempBuffer.clear()
processElement(next, multi = false)
} else {
tempBuffer += elem
processElement(next, multi)
}
} else {
if (elem.startsWith(multiChar)) {
tempBuffer += elem.drop(1).dropRight(until)
processElement(next, multi = true)
} else {
argsArray += elem.dropRight(until)
processElement(next, multi = false)
}
}
}
}

processElement(0, multi = false)
argsArray.foreach(x => programArgs += x)
}
}

Assertions.assertEquals("localhost:8123", programArgs(1))
Assertions.assertEquals("insert into table_a select * from table_b", programArgs(3))
Assertions.assertEquals("d", programArgs(5))
Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7))
val args =
"mysql-sync-database " +
"--database employees " +
"--mysql-conf hostname=127.0.0.1 " +
"--mysql-conf port=3306 " +
"--mysql-conf username=root " +
"--mysql-conf password=123456 " +
"--mysql-conf database-name=employees " +
"--including-tables 'test|test.*' " +
"--excluding-tables \"emp_*\" " +
"--query 'select * from employees where age > 20' " +
"--sink-conf fenodes=127.0.0.1:8030 " +
"--sink-conf username=root " +
"--sink-conf password= " +
"--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " +
"--sink-conf sink.label-prefix=label" +
"--table-conf replication_num=1"
val programArgs = PropertiesUtils.extractArguments(args)
println(programArgs)
}

@Test def testDynamicProperties(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo
StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
)
super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
super.trySubmit(submitRequest, flinkConfig)(jobGraphSubmit, restApiSubmit)
}

/** Submit flink session job via rest api. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ object RemoteClient extends FlinkClientTrait {
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
// submit job
super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)

super.trySubmit(submitRequest, flinkConfig)(jobGraphSubmit, restApiSubmit)
}

override def doCancel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,8 @@ object YarnApplicationClient extends YarnClientTrait {
throw new RuntimeException(s"$pyVenv File does not exist")
}

val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
}
// including $app/lib
includingPipelineJars(submitRequest, flinkConfig)

// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.streampark.flink.client.`trait`

import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger, SystemPropertyUtils}
import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils, FileUtils, Logger, PropertiesUtils, SystemPropertyUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
Expand Down Expand Up @@ -217,57 +217,79 @@ trait FlinkClientTrait extends Logger {
def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse

def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration)(
restApiFunc: (SubmitRequest, Configuration) => SubmitResponse)(
jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse): SubmitResponse = {
// Prioritize using Rest API submit while using JobGraph submit plan as backup
jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse,
restApiFunc: (SubmitRequest, Configuration) => SubmitResponse): SubmitResponse = {
// Prioritize using JobGraph submit plan while using Rest API submit plan as backup
Try {
logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
restApiFunc(submitRequest, flinkConfig)
}.getOrElse {
logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit Plan now.")
Try(jobGraphFunc(submitRequest, flinkConfig)) match {
case Success(r) => r
case Failure(e) =>
logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph Submit Plan failed.")
throw e
}
logInfo(s"[flink-submit] Submit job with JobGraph Plan.")
jobGraphFunc(submitRequest, flinkConfig)
} match {
case Failure(e) =>
logWarn(
s"""\n
|[flink-submit] JobGraph Submit Plan failed, error detail:
|------------------------------------------------------------------
|${ExceptionUtils.stringifyException(e)}
|------------------------------------------------------------------
|Now retry submit with RestAPI Plan ...
|""".stripMargin
)
Try(restApiFunc(submitRequest, flinkConfig)) match {
case Success(r) => r
case Failure(e) =>
logError(
s"""\n
|[flink-submit] RestAPI Submit failed, error detail:
|------------------------------------------------------------------
|${ExceptionUtils.stringifyException(e)}
|------------------------------------------------------------------
|Both JobGraph submit plan and Rest API submit plan all failed!
|""".stripMargin
)
throw e
}
case Success(v) => v
}
}

private[client] def getJobGraph(
submitRequest: SubmitRequest,
flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}

val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
}

flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
}

val packageProgram = PackagedProgram.newBuilder
val pkgBuilder = PackagedProgram.newBuilder
.setUserClassPaths(
Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
)
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
)
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*)
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
.orElse(Lists.newArrayList()): _*
)
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
.build()

submitRequest.developmentMode match {
case FlinkDevelopmentMode.PYFLINK =>
val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
// including $app/lib
includingPipelineJars(submitRequest, flinkConfig)
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
case _ =>
pkgBuilder.setJarFile(submitRequest.userJarFile)
}

val packageProgram = pkgBuilder.build()
val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
flinkConfig,
Expand Down Expand Up @@ -433,60 +455,7 @@ trait FlinkClientTrait extends Logger {

private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = {
val programArgs = new ArrayBuffer[String]()
val args = submitRequest.args

if (StringUtils.isNotBlank(args)) {
val multiChar = "\""
val array = args.split("\\s+")
if (!array.exists(_.startsWith(multiChar))) {
array.foreach(programArgs +=)
} else {
val argsArray = new ArrayBuffer[String]()
val tempBuffer = new ArrayBuffer[String]()

@tailrec
def processElement(index: Int, multi: Boolean): Unit = {

if (index == array.length) {
if (tempBuffer.nonEmpty) {
argsArray += tempBuffer.mkString(" ")
}
return
}

val next = index + 1
val elem = array(index).trim

if (elem.isEmpty) {
processElement(next, multi = false)
} else {
if (multi) {
if (elem.endsWith(multiChar)) {
tempBuffer += elem.dropRight(1)
argsArray += tempBuffer.mkString(" ")
tempBuffer.clear()
processElement(next, multi = false)
} else {
tempBuffer += elem
processElement(next, multi)
}
} else {
val until = if (elem.endsWith(multiChar)) 1 else 0
if (elem.startsWith(multiChar)) {
tempBuffer += elem.drop(1).dropRight(until)
processElement(next, multi = true)
} else {
argsArray += elem.dropRight(until)
processElement(next, multi = false)
}
}
}
}

processElement(0, multi = false)
argsArray.foreach(x => programArgs += x)
}
}
programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)

if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {

Expand Down Expand Up @@ -522,7 +491,7 @@ trait FlinkClientTrait extends Logger {
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)
}
programArgs.toList.asJava
Lists.newArrayList(programArgs: _*)
}

private[this] def applyConfiguration(
Expand Down Expand Up @@ -626,4 +595,13 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath, savepointRequest.nativeFormat).get()
}

private[client] def includingPipelineJars(
submitRequest: SubmitRequest,
flinkConfig: Configuration) = {
val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
}
}

}
Loading

0 comments on commit 4dc5eb7

Please sign in to comment.