From a73f3514c060ab33c282197cecec0cfb52145648 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 11 Nov 2023 11:46:08 +0800 Subject: [PATCH] [Bug] extract programArgs bug fixed --- .../streampark/common/util/PropertiesUtils.scala | 13 ++++++++++++- .../common/util/PropertiesUtilsTestCase.scala | 16 ++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 3098daa5da..35facd89f1 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -322,7 +322,18 @@ object PropertiesUtils extends Logger { } } programArgs += value.substring(1, value.length - 1) - case _ => programArgs += v + case _ => + val regexp = "(.*)='(.*)'$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + val regexp = "(.*)=\"(.*)\"$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + programArgs += v + } + } } } } diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala index 995cfd2ece..827d0f11cc 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala @@ -24,8 +24,20 @@ import scala.language.postfixOps class PropertiesUtilsTestCase { @Test def testExtractProgramArgs(): Unit = { - val args = - "mysql-sync-database \n--database employees \n--mysql-conf hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root \n--mysql-conf password=123456 \n--mysql-conf database-name=employees \n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030 \n--sink-conf username=root \n--sink-conf password= \n--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf sink.label-prefix=label\n--table-conf replication_num=1 " + val args = "mysql-sync-table \n" + + "--warehouse hdfs:///paimon \n" + + "--database test_db \n" + + "--table test_table \n" + + "--mysql-conf hostname=localhost \n" + + "--mysql-conf username=root \n" + + "--mysql-conf password=123456 \n" + + "--mysql-conf database-name='employees' \n" + + "--mysql-conf table-name='employees' \n" + + "--catalog-conf metastore=hive \n" + + "--catalog-conf uri=thrift://localhost:9083 \n" + + "--table-conf bucket=1 \n" + + "--table-conf changelog-producer=input \n" + + "--table-conf sink.parallelism=1" val programArgs = new ArrayBuffer[String]() programArgs ++= PropertiesUtils.extractArguments(args) println(programArgs)