Skip to content

Commit

Permalink
[Improve] Constant improvement (#3272)
Browse files Browse the repository at this point in the history
* [Improve] Constant improvement

* minor imprement

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Oct 23, 2023
1 parent db330bb commit a35618b
Show file tree
Hide file tree
Showing 56 changed files with 173 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,35 @@
package org.apache.streampark.common;

/** A constant class to hold the constants variables. */
public class Constant {
public final class Constant {

private Constant() {}

public static final String DEFAULT = "default";

public static final String STREAM_PARK = "streampark";

public static final String HTTP_SCHEMA = "http://";

public static final String HTTPS_SCHEMA = "https://";

public static final String JAR_SUFFIX = ".jar";

public static final String ZIP_SUFFIX = ".zip";

public static final String EMPTY_STRING = "";

public static final String PYTHON_SUFFIX = ".py";

public static final String SEMICOLON = ";";

public static final String DEFAULT_DATAMASK_STRING = "********";

public static final String PYTHON_FLINK_DRIVER_CLASS_NAME =
"org.apache.flink.client.python.PythonDriver";

public static final String STREAMPARK_FLINKSQL_CLIENT_CLASS =
"org.apache.streampark.flink.cli.SqlClient";

public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,10 @@
*/
package org.apache.streampark.common.conf

import java.time.LocalDateTime

object ConfigConst {

/** common const */
val DEFAULT_DATAMASK_STRING = "********"
object ConfigKeys {

val PARAM_PREFIX = "--"

/** pyflink */
val PYTHON_SUFFIX = ".py"

val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"

val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"

/** about parameter... */
val KEY_APP_HOME = "app.home"

Expand All @@ -49,9 +37,6 @@ object ConfigConst {

val KEY_SEMANTIC = "semantic"

/** sign.... */
val SIGN_EMPTY = ""

/** kerberos */
val KEY_KERBEROS = "kerberos"

Expand Down Expand Up @@ -180,23 +165,4 @@ object ConfigConst {

val KEY_FLINK_TM_PROCESS_MEMORY = "taskmanager.memory.process.size"

val STREAMPARK_FLINKSQL_CLIENT_CLASS = "org.apache.streampark.flink.cli.SqlClient"

def printLogo(info: String): Unit = {
// scalastyle:off println
println("\n")
println(" _____ __ __ ")
println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ ")
println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/")
println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< ")
println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| ")
println(" /_/ \n\n")
println(" Version: 2.2.0-SNAPSHOT ")
println(" WebSite: https://streampark.apache.org ")
println(" GitHub : https://github.com/apache/incubator-streampark ")
println(s" Info : $info ")
println(s" Time : ${LocalDateTime.now} \n\n")
// scalastyle:on println
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.common.conf

import org.apache.streampark.common.Constant
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.common.util.ImplicitsUtils._

Expand Down Expand Up @@ -164,7 +165,7 @@ object InternalConfigHolder extends Logger {
| ${configKeys
.map(
key =>
s"$key = ${if (key.contains("password")) ConfigConst.DEFAULT_DATAMASK_STRING
s"$key = ${if (key.contains("password")) Constant.DEFAULT_DATAMASK_STRING
else get(key)}")
.mkString("\n ")}""".stripMargin)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.streampark.common.util

import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.ConfigKeys._

import java.util.{Map => JavaMap, Properties}

Expand Down Expand Up @@ -52,7 +53,7 @@ object ConfigUtils {
val kafkaProperty = new Properties()
param.foreach(x => kafkaProperty.put(x._1, x._2.trim))
val _topic = topic match {
case SIGN_EMPTY =>
case Constant.EMPTY_STRING =>
val top = kafkaProperty.getOrElse(KEY_KAFKA_TOPIC, null)
if (top == null || top.split(",|\\s+").length > 1) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.util

import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.ConfigKeys._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.streampark.common.util

import org.apache.streampark.common.conf.{CommonConfig, ConfigConst, InternalConfigHolder}
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.{CommonConfig, ConfigKeys, InternalConfigHolder}
import org.apache.streampark.common.conf.ConfigKeys._

import org.apache.commons.collections.CollectionUtils
import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -62,7 +62,7 @@ object HadoopUtils extends Logger {
InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME)

private[this] lazy val kerberosConf: Map[String, String] =
SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) match {
SystemPropertyUtils.get(ConfigKeys.KEY_APP_HOME, null) match {
case null =>
getClass.getResourceAsStream("/kerberos.yml") match {
case x if x != null => PropertiesUtils.fromYamlFile(x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.util

import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.ConfigKeys._

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.util

import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.ConfigKeys._

import com.mongodb._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.streampark.common.util

import org.apache.streampark.common.conf.ConfigConst
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.ConfigKeys

import redis.clients.jedis._
import redis.clients.jedis.exceptions.JedisConnectionException
Expand Down Expand Up @@ -92,7 +93,7 @@ object RedisClient extends Logger {
* @return
*/
def createJedisPool(endpoint: RedisEndpoint): JedisPool = {
val endpointEn: RedisEndpoint = endpoint.copy(auth = ConfigConst.DEFAULT_DATAMASK_STRING)
val endpointEn: RedisEndpoint = endpoint.copy(auth = Constant.DEFAULT_DATAMASK_STRING)
logInfo(s"[StreamPark] RedisClient: createJedisPool with $endpointEn ")
new JedisPool(
poolConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.common.util

import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.ConfigKeys._

import redis.clients.jedis.{Jedis, Protocol}
import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.commons.lang3.StringUtils

import java.io._
import java.net.URL
import java.time.Duration
import java.time.{Duration, LocalDateTime}
import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID}
import java.util.concurrent.locks.LockSupport
import java.util.jar.{JarFile, JarInputStream}
Expand Down Expand Up @@ -156,4 +156,21 @@ object Utils extends Logger {
}
}

def printLogo(info: String): Unit = {
// scalastyle:off println
println("\n")
println(" _____ __ __ ")
println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ ")
println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/")
println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< ")
println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| ")
println(" /_/ \n\n")
println(" Version: 2.2.0-SNAPSHOT ")
println(" WebSite: https://streampark.apache.org ")
println(" GitHub : https://github.com/apache/incubator-streampark ")
println(s" Info : $info ")
println(s" Time : ${LocalDateTime.now} \n\n")
// scalastyle:on println
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.console.base.util;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.ConfigKeys;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -97,7 +97,7 @@ public static String camelToUnderscore(String value) {
}

public static String getAppHome() {
return System.getProperty(ConfigConst.KEY_APP_HOME());
return System.getProperty(ConfigKeys.KEY_APP_HOME());
}

public static File getAppDir(String dir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.streampark.console.core.entity;

import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
Expand Down Expand Up @@ -278,10 +278,10 @@ public void setYarnQueueByHotParams() {

Map<String, Object> hotParamsMap = this.getHotParamsMap();
if (MapUtils.isNotEmpty(hotParamsMap)
&& hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
String yarnQueue = hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
&& hotParamsMap.containsKey(ConfigKeys.KEY_YARN_APP_QUEUE())) {
String yarnQueue = hotParamsMap.get(ConfigKeys.KEY_YARN_APP_QUEUE()).toString();
String labelExpr =
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
Optional.ofNullable(hotParamsMap.get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()))
.map(Object::toString)
.orElse(null);
this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue, labelExpr).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.console.core.entity;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
Expand Down Expand Up @@ -103,16 +103,16 @@ public Map<String, String> readConfig() {
Collectors.toMap(
entry -> {
String key = entry.getKey();
if (key.startsWith(ConfigConst.KEY_FLINK_OPTION_PREFIX())) {
key = key.substring(ConfigConst.KEY_FLINK_OPTION_PREFIX().length());
} else if (key.startsWith(ConfigConst.KEY_FLINK_PROPERTY_PREFIX())) {
key = key.substring(ConfigConst.KEY_FLINK_PROPERTY_PREFIX().length());
} else if (key.startsWith(ConfigConst.KEY_FLINK_TABLE_PREFIX())) {
key = key.substring(ConfigConst.KEY_FLINK_TABLE_PREFIX().length());
} else if (key.startsWith(ConfigConst.KEY_APP_PREFIX())) {
key = key.substring(ConfigConst.KEY_APP_PREFIX().length());
} else if (key.startsWith(ConfigConst.KEY_SQL_PREFIX())) {
key = key.substring(ConfigConst.KEY_SQL_PREFIX().length());
if (key.startsWith(ConfigKeys.KEY_FLINK_OPTION_PREFIX())) {
key = key.substring(ConfigKeys.KEY_FLINK_OPTION_PREFIX().length());
} else if (key.startsWith(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX())) {
key = key.substring(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX().length());
} else if (key.startsWith(ConfigKeys.KEY_FLINK_TABLE_PREFIX())) {
key = key.substring(ConfigKeys.KEY_FLINK_TABLE_PREFIX().length());
} else if (key.startsWith(ConfigKeys.KEY_APP_PREFIX())) {
key = key.substring(ConfigKeys.KEY_APP_PREFIX().length());
} else if (key.startsWith(ConfigKeys.KEY_SQL_PREFIX())) {
key = key.substring(ConfigKeys.KEY_SQL_PREFIX().length());
}
return key;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.console.core.entity;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
Expand Down Expand Up @@ -138,7 +138,7 @@ public Map<String, Object> getOptionMap() {
}
Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) {
map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
map.put(ConfigKeys.KEY_YARN_APP_NAME(), this.clusterName);
map.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue));
}
map.entrySet().removeIf(entry -> entry.getValue() == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.console.core.entity;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.Constant;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class Variable implements Serializable {

public void dataMasking() {
if (desensitization) {
this.setVariableValue(ConfigConst.DEFAULT_DATAMASK_STRING());
this.setVariableValue(Constant.DEFAULT_DATAMASK_STRING);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.streampark.console.core.runner;

import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.InternalOption;
import org.apache.streampark.common.conf.Workspace;
Expand Down Expand Up @@ -92,14 +92,14 @@ public void run(ApplicationArguments args) throws Exception {
+ " The system initialization check failed. If started local for development and debugging,"
+ " please ensure the -D%s parameter is clearly specified,"
+ " more detail: https://streampark.apache.org/docs/user-guide/deployment",
ConfigConst.KEY_APP_HOME()));
ConfigKeys.KEY_APP_HOME()));
}

// init InternalConfig
initInternalConfig(context.getEnvironment());
// overwrite system variable HADOOP_USER_NAME
String hadoopUserName = InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME());
overrideSystemProp(ConfigConst.KEY_HADOOP_USER_NAME(), hadoopUserName);
overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
// initialize local file system resources
storageInitialize(LFS);
// Launch the embedded http file server.
Expand Down
Loading

0 comments on commit a35618b

Please sign in to comment.