diff --git a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java index ac02bb8a17..553d68bb3c 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java @@ -18,15 +18,35 @@ package org.apache.streampark.common; /** A constant class to hold the constants variables. */ -public class Constant { +public final class Constant { private Constant() {} public static final String DEFAULT = "default"; + public static final String STREAM_PARK = "streampark"; + public static final String HTTP_SCHEMA = "http://"; + public static final String HTTPS_SCHEMA = "https://"; + public static final String JAR_SUFFIX = ".jar"; + public static final String ZIP_SUFFIX = ".zip"; + + public static final String EMPTY_STRING = ""; + + public static final String PYTHON_SUFFIX = ".py"; + public static final String SEMICOLON = ";"; + + public static final String DEFAULT_DATAMASK_STRING = "********"; + + public static final String PYTHON_FLINK_DRIVER_CLASS_NAME = + "org.apache.flink.client.python.PythonDriver"; + + public static final String STREAMPARK_FLINKSQL_CLIENT_CLASS = + "org.apache.streampark.flink.cli.SqlClient"; + + public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"; } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala similarity index 75% rename from streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala rename to streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala index f465f25e5e..f2fb98fb03 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala @@ -16,22 +16,10 @@ */ package org.apache.streampark.common.conf -import java.time.LocalDateTime - -object ConfigConst { - - /** common const */ - val DEFAULT_DATAMASK_STRING = "********" +object ConfigKeys { val PARAM_PREFIX = "--" - /** pyflink */ - val PYTHON_SUFFIX = ".py" - - val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver" - - val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3" - /** about parameter... */ val KEY_APP_HOME = "app.home" @@ -49,9 +37,6 @@ object ConfigConst { val KEY_SEMANTIC = "semantic" - /** sign.... */ - val SIGN_EMPTY = "" - /** kerberos */ val KEY_KERBEROS = "kerberos" @@ -180,23 +165,4 @@ object ConfigConst { val KEY_FLINK_TM_PROCESS_MEMORY = "taskmanager.memory.process.size" - val STREAMPARK_FLINKSQL_CLIENT_CLASS = "org.apache.streampark.flink.cli.SqlClient" - - def printLogo(info: String): Unit = { - // scalastyle:off println - println("\n") - println(" _____ __ __ ") - println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ ") - println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/") - println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< ") - println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| ") - println(" /_/ \n\n") - println(" Version: 2.2.0-SNAPSHOT ") - println(" WebSite: https://streampark.apache.org ") - println(" GitHub : https://github.com/apache/incubator-streampark ") - println(s" Info : $info ") - println(s" Time : ${LocalDateTime.now} \n\n") - // scalastyle:on println - } - } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala index f6631b697f..18aae4e8e3 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala @@ -17,6 +17,7 @@ package org.apache.streampark.common.conf +import org.apache.streampark.common.Constant import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} import org.apache.streampark.common.util.ImplicitsUtils._ @@ -164,7 +165,7 @@ object InternalConfigHolder extends Logger { | ${configKeys .map( key => - s"$key = ${if (key.contains("password")) ConfigConst.DEFAULT_DATAMASK_STRING + s"$key = ${if (key.contains("password")) Constant.DEFAULT_DATAMASK_STRING else get(key)}") .mkString("\n ")}""".stripMargin) } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala index e0c159299d..4cb23a2ad4 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala @@ -16,7 +16,8 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.ConfigKeys._ import java.util.{Map => JavaMap, Properties} @@ -52,7 +53,7 @@ object ConfigUtils { val kafkaProperty = new Properties() param.foreach(x => kafkaProperty.put(x._1, x._2.trim)) val _topic = topic match { - case SIGN_EMPTY => + case Constant.EMPTY_STRING => val top = kafkaProperty.getOrElse(KEY_KAFKA_TOPIC, null) if (top == null || top.split(",|\\s+").length > 1) { throw new IllegalArgumentException( diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala index f128f252bd..e1ec974e7e 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index a2ae4bb905..93d0573a8b 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -17,8 +17,8 @@ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.{CommonConfig, ConfigConst, InternalConfigHolder} -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.{CommonConfig, ConfigKeys, InternalConfigHolder} +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.commons.collections.CollectionUtils import org.apache.commons.lang3.StringUtils @@ -62,7 +62,7 @@ object HadoopUtils extends Logger { InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME) private[this] lazy val kerberosConf: Map[String, String] = - SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) match { + SystemPropertyUtils.get(ConfigKeys.KEY_APP_HOME, null) match { case null => getClass.getResourceAsStream("/kerberos.yml") match { case x if x != null => PropertiesUtils.fromYamlFile(x) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala index 6029d88e9f..22bbfb889c 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import com.zaxxer.hikari.{HikariConfig, HikariDataSource} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala index e04c97d132..7a6764df99 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import com.mongodb._ diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala index 2b2a4420f1..c208411cb8 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala @@ -17,7 +17,8 @@ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.ConfigKeys import redis.clients.jedis._ import redis.clients.jedis.exceptions.JedisConnectionException @@ -92,7 +93,7 @@ object RedisClient extends Logger { * @return */ def createJedisPool(endpoint: RedisEndpoint): JedisPool = { - val endpointEn: RedisEndpoint = endpoint.copy(auth = ConfigConst.DEFAULT_DATAMASK_STRING) + val endpointEn: RedisEndpoint = endpoint.copy(auth = Constant.DEFAULT_DATAMASK_STRING) logInfo(s"[StreamPark] RedisClient: createJedisPool with $endpointEn ") new JedisPool( poolConfig, diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala index ab765e6914..82de514212 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala @@ -17,7 +17,7 @@ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import redis.clients.jedis.{Jedis, Protocol} import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index d56322982b..5ab002ab22 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -20,7 +20,7 @@ import org.apache.commons.lang3.StringUtils import java.io._ import java.net.URL -import java.time.Duration +import java.time.{Duration, LocalDateTime} import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID} import java.util.concurrent.locks.LockSupport import java.util.jar.{JarFile, JarInputStream} @@ -156,4 +156,21 @@ object Utils extends Logger { } } + def printLogo(info: String): Unit = { + // scalastyle:off println + println("\n") + println(" _____ __ __ ") + println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ ") + println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/") + println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< ") + println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| ") + println(" /_/ \n\n") + println(" Version: 2.2.0-SNAPSHOT ") + println(" WebSite: https://streampark.apache.org ") + println(" GitHub : https://github.com/apache/incubator-streampark ") + println(s" Info : $info ") + println(s" Time : ${LocalDateTime.now} \n\n") + // scalastyle:on println + } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java index cfef4cbe06..7313788e5e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.base.util; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.commons.lang3.StringUtils; @@ -97,7 +97,7 @@ public static String camelToUnderscore(String value) { } public static String getAppHome() { - return System.getProperty(ConfigConst.KEY_APP_HOME()); + return System.getProperty(ConfigKeys.KEY_APP_HOME()); } public static File getAppDir(String dir) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 6707950128..09deab73da 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.Constant; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.FlinkDevelopmentMode; @@ -278,10 +278,10 @@ public void setYarnQueueByHotParams() { Map hotParamsMap = this.getHotParamsMap(); if (MapUtils.isNotEmpty(hotParamsMap) - && hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) { - String yarnQueue = hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString(); + && hotParamsMap.containsKey(ConfigKeys.KEY_YARN_APP_QUEUE())) { + String yarnQueue = hotParamsMap.get(ConfigKeys.KEY_YARN_APP_QUEUE()).toString(); String labelExpr = - Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL())) + Optional.ofNullable(hotParamsMap.get(ConfigKeys.KEY_YARN_APP_NODE_LABEL())) .map(Object::toString) .orElse(null); this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue, labelExpr).toString()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java index b920d6e560..a504148393 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; @@ -103,16 +103,16 @@ public Map readConfig() { Collectors.toMap( entry -> { String key = entry.getKey(); - if (key.startsWith(ConfigConst.KEY_FLINK_OPTION_PREFIX())) { - key = key.substring(ConfigConst.KEY_FLINK_OPTION_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_FLINK_PROPERTY_PREFIX())) { - key = key.substring(ConfigConst.KEY_FLINK_PROPERTY_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_FLINK_TABLE_PREFIX())) { - key = key.substring(ConfigConst.KEY_FLINK_TABLE_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_APP_PREFIX())) { - key = key.substring(ConfigConst.KEY_APP_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_SQL_PREFIX())) { - key = key.substring(ConfigConst.KEY_SQL_PREFIX().length()); + if (key.startsWith(ConfigKeys.KEY_FLINK_OPTION_PREFIX())) { + key = key.substring(ConfigKeys.KEY_FLINK_OPTION_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX())) { + key = key.substring(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_FLINK_TABLE_PREFIX())) { + key = key.substring(ConfigKeys.KEY_FLINK_TABLE_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_APP_PREFIX())) { + key = key.substring(ConfigKeys.KEY_APP_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_SQL_PREFIX())) { + key = key.substring(ConfigKeys.KEY_SQL_PREFIX().length()); } return key; }, diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 48f3d8b15c..611fc038eb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.enums.FlinkK8sRestExposedType; @@ -138,7 +138,7 @@ public Map getOptionMap() { } Map map = JacksonUtils.read(this.options, Map.class); if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) { - map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName); + map.put(ConfigKeys.KEY_YARN_APP_NAME(), this.clusterName); map.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue)); } map.entrySet().removeIf(entry -> entry.getValue() == null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java index 458c74d3db..6d4f1f9e4b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.Constant; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; @@ -70,7 +70,7 @@ public class Variable implements Serializable { public void dataMasking() { if (desensitization) { - this.setVariableValue(ConfigConst.DEFAULT_DATAMASK_STRING()); + this.setVariableValue(Constant.DEFAULT_DATAMASK_STRING); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index 13d631d4ff..86dc541691 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.runner; import org.apache.streampark.common.conf.CommonConfig; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.InternalOption; import org.apache.streampark.common.conf.Workspace; @@ -92,14 +92,14 @@ public void run(ApplicationArguments args) throws Exception { + " The system initialization check failed. If started local for development and debugging," + " please ensure the -D%s parameter is clearly specified," + " more detail: https://streampark.apache.org/docs/user-guide/deployment", - ConfigConst.KEY_APP_HOME())); + ConfigKeys.KEY_APP_HOME())); } // init InternalConfig initInternalConfig(context.getEnvironment()); // overwrite system variable HADOOP_USER_NAME String hadoopUserName = InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME()); - overrideSystemProp(ConfigConst.KEY_HADOOP_USER_NAME(), hadoopUserName); + overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName); // initialize local file system resources storageInitialize(LFS); // Launch the embedded http file server. diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index b60bd0d349..ff07452845 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.common.Constant; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.FlinkDevelopmentMode; @@ -418,7 +418,7 @@ public void start(Application appParam, boolean auto) throws Exception { // Get the sql of the replaced placeholder String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql()); flinkSql.setSql(DeflaterUtils.zipString(realSql)); - extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql()); + extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql()); } // TODO Need to display more K8s submission parameters in the front-end UI. @@ -478,12 +478,12 @@ public void start(Application appParam, boolean auto) throws Exception { submitResponse -> { if (submitResponse.flinkConfig() != null) { String jmMemory = - submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY()); + submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); if (jmMemory != null) { application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); } String tmMemory = - submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY()); + submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY()); if (tmMemory != null) { application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); } @@ -610,7 +610,7 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati resource.getFilePath(), "pyflink file can't be null, start application failed."); ApiAlertException.throwIfFalse( - resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX()), + resource.getFilePath().endsWith(Constant.PYTHON_SUFFIX), "pyflink format error, must be a \".py\" suffix, start application failed."); flinkUserJar = resource.getFilePath(); @@ -621,7 +621,7 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati appConf = String.format( "json://{\"%s\":\"%s\"}", - ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); } else { switch (application.getApplicationType()) { case STREAMPARK_FLINK: @@ -639,7 +639,7 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati appConf = String.format( "json://{\"%s\":\"%s\"}", - ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); break; default: throw new IllegalArgumentException( @@ -701,19 +701,19 @@ private Map getProperties(Application application) { "The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " + "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId())); - properties.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId()); + properties.put(ConfigKeys.KEY_YARN_APP_ID(), cluster.getClusterId()); } else { String yarnQueue = - (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE()); + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE()); String yarnLabelExpr = - (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_NODE_LABEL()); + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()); Optional.ofNullable(yarnQueue) - .ifPresent(yq -> properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yq)); + .ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yq)); Optional.ofNullable(yarnLabelExpr) - .ifPresent(yLabel -> properties.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(), yLabel)); + .ifPresent(yLabel -> properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel)); } } else if (FlinkExecutionMode.isKubernetesMode(application.getFlinkExecutionMode())) { - properties.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always"); + properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always"); } if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 222dc56bf0..15946b7f65 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -18,7 +18,6 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.Constant; -import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; @@ -441,7 +440,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { } FlinkExecutionMode executionModeEnum = app.getFlinkExecutionMode(); - String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS(); + String mainClass = Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS; switch (executionModeEnum) { case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index 47b1136160..c1b4b4564c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.Constant; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.ExceptionUtils; @@ -289,7 +289,7 @@ public RestResponse checkResource(Resource resourceParam) throws JsonProcessingE resp.put(EXCEPTION, ExceptionUtils.stringifyException(e)); return RestResponse.success().data(resp); } - if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) { + if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) { return RestResponse.success().data(resp); } Manifest manifest = Utils.getJarManifest(jarFile); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java index e44482b580..ab0a35bea6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.utils; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.commons.lang3.StringUtils; @@ -111,8 +111,8 @@ public static Map getQueueLabelMap(String queueLabelExp) { Map map = new HashMap<>(2); yarnQueueLabelExpression .getLabelExpression() - .ifPresent(labelExp -> map.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(), labelExp)); - map.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue); + .ifPresent(labelExp -> map.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), labelExp)); + map.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue); return map; } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java index 88a2f57339..1699e0b6e6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.Constant; import org.apache.streampark.console.core.enums.LoginTypeEnum; import org.apache.streampark.console.core.enums.UserTypeEnum; @@ -101,7 +101,7 @@ public class User implements Serializable { private Long lastTeamId; public void dataMasking() { - String dataMask = ConfigConst.DEFAULT_DATAMASK_STRING(); + String dataMask = Constant.DEFAULT_DATAMASK_STRING; this.setPassword(dataMask); this.setSalt(dataMask); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java index 4a19b709f7..3228c2266f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.runner; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.util.Utils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -37,7 +37,7 @@ public class StartedUpRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) { if (context.isActive()) { - ConfigConst.printLogo("streampark-console start successful"); + Utils.printLogo("streampark-console start successful"); } } } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java index 8780e221eb..398fce3d5e 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java @@ -18,7 +18,7 @@ package org.apache.streampark.console; import org.apache.streampark.common.conf.CommonConfig; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.commons.io.FileUtils; @@ -91,7 +91,7 @@ public static void init(@TempDir File tempPath) throws IOException { Files.createDirectories(new File(tempAbsPath, DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath()); appHome = new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath(); - System.setProperty(ConfigConst.KEY_APP_HOME(), appHome); + System.setProperty(ConfigKeys.KEY_APP_HOME(), appHome); System.setProperty( CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(), localWorkspace.toAbsolutePath().toString()); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java index 5e40b31228..88f5284d80 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java @@ -18,7 +18,7 @@ package org.apache.streampark.console; import org.apache.streampark.common.conf.CommonConfig; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.FlinkCluster; @@ -68,7 +68,7 @@ public static void init(@TempDir File tempPath) throws IOException { Path localWorkspace = Files.createDirectories(new File(mockedHome + "/localWorkspace").toPath()); - System.setProperty(ConfigConst.KEY_APP_HOME(), mockedHome); + System.setProperty(ConfigKeys.KEY_APP_HOME(), mockedHome); System.setProperty( CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(), localWorkspace.toAbsolutePath().toString()); diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 18665186f9..25e17120d4 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -17,8 +17,9 @@ package org.apache.streampark.flink.client.bean -import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion, Workspace} -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace} +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums._ import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils} import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse} @@ -59,8 +60,8 @@ case class SubmitRequest( lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX) lazy val appMain: String = this.developmentMode match { - case FlinkDevelopmentMode.FLINK_SQL => ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS - case FlinkDevelopmentMode.PYFLINK => ConfigConst.PYTHON_DRIVER_CLASS_NAME + case FlinkDevelopmentMode.FLINK_SQL => Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS + case FlinkDevelopmentMode.PYFLINK => Constant.PYTHON_FLINK_DRIVER_CLASS_NAME case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index f39fa341ab..f5862a082e 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -17,7 +17,8 @@ package org.apache.streampark.flink.client.impl -import org.apache.streampark.common.conf.{ConfigConst, Workspace} +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.{ConfigKeys, Workspace} import org.apache.streampark.common.enums.FlinkDevelopmentMode import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils} @@ -117,15 +118,15 @@ object YarnApplicationClient extends YarnClientTrait { // python.archives .safeSet(PythonOptions.PYTHON_ARCHIVES, pyVenv) // python.client.executable - .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) // python.executable - .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) val args: util.List[String] = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS) // Caused by: java.lang.UnsupportedOperationException val argsList: util.ArrayList[String] = new util.ArrayList[String](args) argsList.add("-pym") - argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length)) + argsList.add(submitRequest.userJarFile.getName.dropRight(Constant.PYTHON_SUFFIX.length)) flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 7fb6aa1483..75a05326f4 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -17,8 +17,9 @@ package org.apache.streampark.flink.client.`trait` -import org.apache.streampark.common.conf.{ConfigConst, Workspace} -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.{ConfigKeys, Workspace} +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode} import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger, SystemPropertyUtils} @@ -251,9 +252,9 @@ trait FlinkClientTrait extends Logger { // python.archives .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) // python.client.executable - .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) // python.executable - .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) } val packageProgram = PackagedProgram.newBuilder diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala index c75eb1790b..041dbe5ac9 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala @@ -17,9 +17,9 @@ package org.apache.streampark.flink.connector.clickhouse.conf -import org.apache.streampark.common.conf.ConfigConst import org.apache.streampark.flink.connector.conf.ThresholdConf +import org.apache.streampark.common.Constant import java.util.{Base64, Properties} import java.util.concurrent.ThreadLocalRandom @@ -69,7 +69,7 @@ class ClickHouseHttpConfig(parameters: Properties) override def toString: String = { s""" - |{ user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING}, hosts: ${hosts.mkString(",")} } + |{ user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING}, hosts: ${hosts.mkString(",")} } |""".stripMargin } } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala index 40a97ff1e2..df09056c1a 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala @@ -16,10 +16,10 @@ */ package org.apache.streampark.connector.doris.conf -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.ConfigKeys import java.util.Properties - import scala.collection.convert.ImplicitConversions._ object DorisConfig { @@ -83,7 +83,7 @@ class DorisConfig(parameters: Properties) { override def toString: String = { s""" - |{ doris user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING}, hosts: ${loadUrl.mkString(",")} } + |{ doris user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING}, hosts: ${loadUrl.mkString(",")} } |""".stripMargin } } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala index 41e8e324e0..c3565c9a23 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.influx.function -import org.apache.streampark.common.conf.ConfigConst.{KEY_JDBC_PASSWORD, KEY_JDBC_URL, KEY_JDBC_USER} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_JDBC_PASSWORD, KEY_JDBC_URL, KEY_JDBC_USER} import org.apache.streampark.common.enums.ApiType import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.connector.influx.bean.InfluxEntity diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala index f00647dd3b..6fb365e316 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.jdbc.internal -import org.apache.streampark.common.conf.ConfigConst.{DEFAULT_JDBC_INSERT_BATCH, KEY_JDBC_INSERT_BATCH} +import org.apache.streampark.common.conf.ConfigKeys.{DEFAULT_JDBC_INSERT_BATCH, KEY_JDBC_INSERT_BATCH} import org.apache.streampark.common.enums.ApiType import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util.{JdbcUtils, Logger} diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala index 1303e36612..c5ca134e2c 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.jdbc.sink -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.Semantic import org.apache.streampark.common.util.{ConfigUtils, Logger} import org.apache.streampark.flink.connector.jdbc.internal.{Jdbc2PCSinkFunction, JdbcSinkFunction} diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala index 9bfe176e03..8a60922a8b 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.kafka.sink -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.util.{ConfigUtils, Utils} import org.apache.streampark.flink.connector.kafka.bean.KafkaEqualityPartitioner import org.apache.streampark.flink.connector.sink.Sink @@ -83,10 +83,10 @@ class KafkaSink( val producer = { val prop = ConfigUtils.getKafkaSinkConf(ctx.parameter.toMap, topic, alias) Utils.copyProperties(property, prop) - val topicId = prop.remove(ConfigConst.KEY_KAFKA_TOPIC).toString + val topicId = prop.remove(ConfigKeys.KEY_KAFKA_TOPIC).toString /** kafkaProducersPoolSize will be used under EXACTLY_ONCE semantics */ - val semantic = Try(Some(prop.remove(ConfigConst.KEY_KAFKA_SEMANTIC).toString.toUpperCase)) + val semantic = Try(Some(prop.remove(ConfigKeys.KEY_KAFKA_SEMANTIC).toString.toUpperCase)) .getOrElse(None) match { case None => Semantic.AT_LEAST_ONCE case Some("AT_LEAST_ONCE") => Semantic.AT_LEAST_ONCE diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala index 05e8e7b8ce..aea660284e 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.kafka.source -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.util.{ConfigUtils, Utils} import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord import org.apache.streampark.flink.core.scala.StreamingContext @@ -25,7 +25,7 @@ import org.apache.streampark.flink.core.scala.StreamingContext import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass -import org.apache.flink.streaming.api.scala.{DataStream, _} +import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala index c7bfbd1554..b480231c0a 100644 --- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala +++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core.scala -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} import org.apache.streampark.flink.core.{FlinkTableInitializer, StreamTableContext} import org.apache.streampark.flink.core.TableExt diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala index 7dffccd299..8d5dca940b 100644 --- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala +++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala @@ -17,8 +17,8 @@ package org.apache.streampark.flink.core.scala -import org.apache.streampark.common.conf.ConfigConst._ -import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} +import org.apache.streampark.common.conf.ConfigKeys._ +import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils} import org.apache.streampark.flink.core.{FlinkStreamingInitializer, StreamEnvConfig} import org.apache.streampark.flink.core.EnhancerImplicit._ @@ -50,7 +50,7 @@ class StreamingContext( } @Deprecated override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkStreaming $jobName Starting...") + Utils.printLogo(s"FlinkStreaming $jobName Starting...") super.execute(jobName) } } diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala index 78ead4fb28..44ed2b9728 100644 --- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala +++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core.scala -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} import org.apache.streampark.flink.core.{FlinkTableInitializer, TableContext} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index edd679ad76..ee67a202de 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.util.ImplicitsUtils._ import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum @@ -116,7 +116,7 @@ object KubernetesRetriever extends Logger { .apps() .deployments() .inNamespace(namespace) - .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL) + .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL) .list() .getItems .asScala diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala index e97fcfc1f5..5cba3b36f3 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes.ingress -import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder, K8sFlinkConfig} +import org.apache.streampark.common.conf.{ConfigKeys, InternalConfigHolder, K8sFlinkConfig} import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder} import io.fabric8.kubernetes.client.DefaultKubernetesClient @@ -63,7 +63,7 @@ trait IngressStrategy { def buildIngressLabels(clusterId: String): Map[String, String] = { Map( "app" -> clusterId, - "type" -> ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL, + "type" -> ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL, "component" -> "ingress" ) } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala index f49bf4406f..4e3681a81a 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes.watcher -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.{FlinkK8sWatchController, KubernetesRetriever} import org.apache.streampark.flink.kubernetes.model.{K8sDeploymentEventCV, K8sEventKey} @@ -67,7 +67,7 @@ class FlinkK8sEventWatcher(implicit watchController: FlinkK8sWatchController) k8sClient .apps() .deployments() - .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL) + .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL) .watch(new CompatibleKubernetesWatcher[Deployment, CompKubernetesDeployment] { override def eventReceived(action: Watcher.Action, event: Deployment): Unit = { handleDeploymentEvent(action, event) diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala index 2a8050f357..de3b9eaed3 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala +++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.proxy import org.apache.streampark.common.Constant -import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion} +import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion} import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils} import org.apache.streampark.common.util.ImplicitsUtils._ @@ -108,10 +108,10 @@ object FlinkShimsProxy extends Logger { } def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => Unit): Unit = { - val appHome = System.getProperty(ConfigConst.KEY_APP_HOME) + val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME) require( appHome != null, - String.format("%s is not found on System env.", ConfigConst.KEY_APP_HOME)) + String.format("%s is not found on System env.", ConfigKeys.KEY_APP_HOME)) val libPath = new File(s"$appHome/lib") require(libPath.exists()) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala index 528e479070..d855ee0302 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_APP_NAME} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_NAME, KEY_FLINK_APP_NAME} import org.apache.streampark.common.util.DeflaterUtils import org.apache.flink.api.java.utils.ParameterTool diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala index 8d18a31e25..9d6a09339b 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL +import org.apache.streampark.common.conf.ConfigKeys.KEY_FLINK_SQL import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.core.SqlCommand._ diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala index 53e3812753..517ae1ae6d 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala @@ -16,7 +16,8 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ +import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.core.EnhancerImplicit._ import com.esotericsoftware.kryo.Serializer @@ -77,7 +78,7 @@ abstract class FlinkStreamTableTrait( } @Deprecated def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkStreamTable $jobName Starting...") + Utils.printLogo(s"FlinkStreamTable $jobName Starting...") if (isConvertedToDataStream) { streamEnv.execute(jobName) } else null diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 87a54bd075..bfb4be1cc4 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.ApiType import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util._ diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 2169b8ea58..ce8c938231 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.{ApiType, PlannerType} import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala index e581f857ca..71b1a1c487 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala @@ -16,7 +16,8 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ +import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.core.EnhancerImplicit._ import org.apache.flink.api.common.JobExecutionResult @@ -40,7 +41,7 @@ abstract class FlinkTableTrait(val parameter: ParameterTool, private val tableEn } def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala index dc4de596c4..157216ce8d 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.PARAM_PREFIX +import org.apache.streampark.common.conf.ConfigKeys.PARAM_PREFIX import org.apache.streampark.common.enums.FlinkSqlValidationFailedType import org.apache.streampark.common.util.Logger diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala index 1206c817e9..3331253b3e 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala @@ -16,8 +16,8 @@ */ package org.apache.streampark.flink.core.conf -import org.apache.streampark.common.conf.ConfigConst -import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_OPTION_PREFIX, KEY_FLINK_PROPERTY_PREFIX} +import org.apache.streampark.common.conf.ConfigKeys +import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_OPTION_PREFIX, KEY_FLINK_PROPERTY_PREFIX} import org.apache.streampark.common.util.PropertiesUtils import org.apache.commons.cli.{DefaultParser, Options} @@ -96,7 +96,7 @@ object ParameterCli { x => val key = x._1.drop(propertyPrefix.length).trim val value = x._2.trim - if (key == ConfigConst.KEY_FLINK_APP_NAME) { + if (key == ConfigKeys.KEY_FLINK_APP_NAME) { buffer.append(s" -D$key=${value.replace(" ", "_")}") } else { buffer.append(s" -D$key=$value") @@ -104,7 +104,7 @@ object ParameterCli { } buffer.toString.trim case "--name" => - map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME), "").trim match { + map.getOrElse(propertyPrefix.concat(ConfigKeys.KEY_FLINK_APP_NAME), "").trim match { case appName if appName.nonEmpty => appName case _ => "" } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala index a681018a39..ed6040fa33 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core.test -import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_SQL, PARAM_PREFIX} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_SQL, PARAM_PREFIX} import org.apache.streampark.common.util.DeflaterUtils import org.apache.streampark.flink.core.{FlinkSqlExecutor, FlinkTableInitializer, StreamTableContext} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index bbe35967ce..f7aedd8ea3 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: tableEnv.connect(connectorDescriptor) override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index 96ac485496..f0f2ad1358 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -51,7 +51,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: tableEnv.connect(connectorDescriptor) override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index fcd7708f3e..d0815b5cbd 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -64,7 +64,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor) @Deprecated override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index 64337d26bc..8ca8e3fe11 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala index 3b094d0862..e305f428b3 100644 --- a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala +++ b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.cli -import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_CONF, KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE} import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils} import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser} import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}