diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 7ce7a85a07..0cb97401b4 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -88,17 +88,26 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api case file => file } val configMap = parseConfig(config) - val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) + val appFlinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX) // config priority: explicitly specified priority > project profiles > system profiles val parameter = ParameterTool .fromSystemProperties() - .mergeWith(ParameterTool.fromMap(properConf)) + .mergeWith(ParameterTool.fromMap(appFlinkConf)) .mergeWith(ParameterTool.fromMap(appConf)) .mergeWith(argsMap) - val envConfig = Configuration.fromMap(properConf) + val flinkConf: Map[String, String] = { + parameter.get(KEY_FLINK_CONF(), null) match { + case flinkConf if flinkConf != null => + PropertiesUtils + .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf)) + .filter(_._2.nonEmpty) + case _ => Map.empty + } + } + val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf) FlinkConfiguration(parameter, envConfig, null) } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 7cb463ed75..a69fcc32b5 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -196,23 +196,32 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType }) // config priority: explicitly specified priority > project profiles > system profiles - val properConf = + val appFlinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX) val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX) - val tableConfig = Configuration.fromMap(tableConf) - val envConfig = Configuration.fromMap(properConf) - val parameter = ParameterTool .fromSystemProperties() - .mergeWith(ParameterTool.fromMap(properConf)) + .mergeWith(ParameterTool.fromMap(appFlinkConf)) .mergeWith(ParameterTool.fromMap(tableConf)) .mergeWith(ParameterTool.fromMap(appConf)) .mergeWith(ParameterTool.fromMap(sqlConf)) .mergeWith(argsMap) + val flinkConf: Map[String, String] = { + parameter.get(KEY_FLINK_CONF(), null) match { + case flinkConf if flinkConf != null => + PropertiesUtils + .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf)) + .filter(_._2.nonEmpty) + case _ => Map.empty + } + } + + val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf) + val tableConfig = Configuration.fromMap(tableConf) FlinkConfiguration(parameter, envConfig, tableConfig) } }