From 9721ce835f5a7f28f2ad187346e009633307097b Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Wed, 23 Aug 2023 22:48:30 +0800 Subject: [PATCH] [FLINK-33577][dist] Change the default config file to config.yaml in flink-dist. This closes #24177. --- .../kubernetes_config_configuration.html | 2 +- .../generated/python_configuration.html | 2 +- .../generated/rocksdb_configuration.html | 2 +- .../state_backend_rocksdb_section.html | 2 +- .../java/hadoop/mapred/utils/HadoopUtils.java | 2 +- .../restartstrategy/RestartStrategies.java | 2 +- .../flink/configuration/ConfigConstants.java | 4 +- .../configuration/ResourceManagerOptions.java | 2 +- flink-dist/src/main/assemblies/bin.xml | 2 +- flink-dist/src/main/flink-bin/bin/config.sh | 8 +- flink-dist/src/main/resources/config.yaml | 298 +++++++++++++++++ flink-dist/src/main/resources/flink-conf.yaml | 311 ------------------ .../flink/dist/BashJavaUtilsITCase.java | 51 ++- .../tests/util/flink/FlinkDistribution.java | 6 +- flink-end-to-end-tests/test-scripts/common.sh | 18 +- .../test-scripts/common_yarn_docker.sh | 4 +- .../test-scripts/test_pyflink.sh | 10 +- .../test_yarn_application_kerberos_docker.sh | 2 +- .../test_yarn_job_kerberos_docker.sh | 2 +- .../parquet/ParquetVectorizedInputFormat.java | 2 +- .../org/apache/flink/api/java/DataSet.java | 2 +- .../KubernetesConfigOptions.java | 2 +- .../decorators/FlinkConfMountDecorator.java | 2 +- .../parameters/KubernetesParameters.java | 2 +- flink-python/dev/dev-requirements.txt | 1 + flink-python/pyflink/common/configuration.py | 13 +- .../pyflink/common/restart_strategy.py | 2 +- .../pyflink/datastream/state_backend.py | 16 +- .../stream_execution_environment.py | 2 +- .../pyflink/pyflink_gateway_server.py | 68 ++-- flink-python/pyflink/table/table_config.py | 6 +- .../pyflink/table/table_environment.py | 34 +- flink-python/setup.py | 4 +- .../org/apache/flink/python/PythonConfig.java | 2 +- .../apache/flink/python/PythonOptions.java | 2 +- ...startBackoffTimeStrategyFactoryLoader.java | 4 +- .../state/CheckpointStorageLoader.java | 4 +- .../state/filesystem/FsStateBackend.java | 2 +- .../state/memory/MemoryStateBackend.java | 2 +- .../state/EmbeddedRocksDBStateBackend.java | 4 +- .../streaming/state/RocksDBOptions.java | 2 +- .../streaming/state/RocksDBStateBackend.java | 7 +- .../state/RocksDBStateBackendConfigTest.java | 6 +- .../StreamExecutionEnvironment.java | 2 +- .../service/context/DefaultContext.java | 4 +- .../util/SqlGatewayRestEndpointTestUtils.java | 2 +- .../service/context/SessionContextTest.java | 2 +- .../apache/flink/table/api/TableConfig.java | 4 +- .../container/FlinkContainersSettings.java | 4 +- .../container/FlinkImageBuilder.java | 15 +- .../org/apache/flink/yarn/YarnTestBase.java | 10 +- .../java/org/apache/flink/yarn/Utils.java | 2 +- 52 files changed, 510 insertions(+), 456 deletions(-) create mode 100644 flink-dist/src/main/resources/config.yaml delete mode 100644 flink-dist/src/main/resources/flink-conf.yaml mode change 100644 => 100755 flink-end-to-end-tests/test-scripts/common.sh diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index 7bbdfd5e404f5..86d147f6c0080 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -90,7 +90,7 @@
kubernetes.flink.conf.dir
"/opt/flink/conf" String - The flink conf directory that will be mounted in pod. The flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten from config map. + The flink conf directory that will be mounted in pod. The config.yaml, log4j.properties, logback.xml in this path will be overwritten from config map.
kubernetes.flink.log.dir
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html index 60ef6a9676ef2..d99c1f2a3b95a 100644 --- a/docs/layouts/shortcodes/generated/python_configuration.html +++ b/docs/layouts/shortcodes/generated/python_configuration.html @@ -18,7 +18,7 @@
python.client.executable
"python" String - The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. Equivalent to the command line option "-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The priority is as following:
1. the configuration 'python.client.executable' defined in the source code(Only used in Flink Java SQL/Table API job call Python UDF);
2. the command line option "-pyclientexec";
3. the configuration 'python.client.executable' defined in flink-conf.yaml
4. the environment variable PYFLINK_CLIENT_EXECUTABLE; + The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. Equivalent to the command line option "-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The priority is as following:
1. the configuration 'python.client.executable' defined in the source code(Only used in Flink Java SQL/Table API job call Python UDF);
2. the command line option "-pyclientexec";
3. the configuration 'python.client.executable' defined in config.yaml
4. the environment variable PYFLINK_CLIENT_EXECUTABLE;
python.executable
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configuration.html index 5cd25cda3f884..b6ad67234e0a3 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html @@ -30,7 +30,7 @@
state.backend.rocksdb.memory.fixed-per-tm
(none) MemorySize - The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (flink-conf.yaml).Note, that this feature breaks resource isolation between the slots + The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (config.yaml).Note, that this feature breaks resource isolation between the slots
state.backend.rocksdb.memory.high-prio-pool-ratio
diff --git a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html index 2dd92d7c0e058..7d917bb7ae8f2 100644 --- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html +++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html @@ -18,7 +18,7 @@
state.backend.rocksdb.memory.fixed-per-tm
(none) MemorySize - The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (flink-conf.yaml).Note, that this feature breaks resource isolation between the slots + The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (config.yaml).Note, that this feature breaks resource isolation between the slots
state.backend.rocksdb.memory.high-prio-pool-ratio
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java index 0cf31d5cabc1b..67a53405d24fa 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java @@ -55,7 +55,7 @@ public static void mergeHadoopConf(JobConf jobConf) { /** * Returns a new Hadoop Configuration object using the path to the hadoop conf configured in the - * main configuration (flink-conf.yaml). This method is public because its being used in the + * main configuration (config.yaml). This method is public because its being used in the * HadoopDataSource. * * @param flinkConfiguration Flink configuration object diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index 191d89d957f03..b099d94489216 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -503,7 +503,7 @@ public int hashCode() { /** * Restart strategy configuration that could be used by jobs to use cluster level restart * strategy. Useful especially when one has a custom implementation of restart strategy set via - * flink-conf.yaml. + * config.yaml. */ @PublicEvolving public static final class FallbackRestartStrategyConfiguration diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index ad7857af06d11..b7751abd07b76 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -391,7 +391,7 @@ public final class ConfigConstants { /** * Prefix for passing custom environment variables to Flink's master process. For example for * passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the flink-conf.yaml. + * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the config.yaml. * * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead. */ @@ -482,7 +482,7 @@ public final class ConfigConstants { /** * Prefix for passing custom environment variables to Flink's ApplicationMaster (JobManager). * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the flink-conf.yaml. + * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the config.yaml. * * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}. */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java index e846e0a114ee1..10283e68657a9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -276,7 +276,7 @@ public class ResourceManagerOptions { /** * Prefix for passing custom environment variables to Flink's master process. For example for * passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the flink-conf.yaml. + * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the config.yaml. */ public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index 9753e1db00365..3ac51f4562ffc 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -123,7 +123,7 @@ under the License. - src/main/resources/flink-conf.yaml + src/main/resources/config.yaml conf 0644 diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index d90e4362f7e1e..2959f9c5ece47 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -112,12 +112,12 @@ readFromConfig() { } ######################################################################################################################## -# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml +# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/config.yaml # -or- the respective environment variables are not set. ######################################################################################################################## # WARNING !!! , these values are only used if there is nothing else is specified in -# conf/flink-conf.yaml +# conf/config.yaml DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to DEFAULT_ENV_LOG_MAX=10 # Maximum number of old log files to keep @@ -134,7 +134,7 @@ DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Direc DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary ######################################################################################################################## -# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml +# CONFIG KEYS: The default values can be overwritten by the following keys in conf/config.yaml ######################################################################################################################## KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" @@ -351,7 +351,7 @@ if [ -z "${HIGH_AVAILABILITY}" ]; then fi # Arguments for the JVM. Used for job and task manager JVMs. -# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys +# DO NOT USE FOR MEMORY SETTINGS! Use conf/config.yaml with keys # JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that! if [ -z "${JVM_ARGS}" ]; then JVM_ARGS="" diff --git a/flink-dist/src/main/resources/config.yaml b/flink-dist/src/main/resources/config.yaml new file mode 100644 index 0000000000000..08aace171ea5e --- /dev/null +++ b/flink-dist/src/main/resources/config.yaml @@ -0,0 +1,298 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# These parameters are required for Java 17 support. +# They can be safely removed when using Java 8/11. +env: + java: + opts: + all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED + +#============================================================================== +# Common +#============================================================================== + +jobmanager: + # The host interface the JobManager will bind to. By default, this is localhost, and will prevent + # the JobManager from communicating outside the machine/container it is running on. + # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. + # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. + # + # To enable this, set the bind-host address to one that has access to an outside facing network + # interface, such as 0.0.0.0. + bind-host: localhost + rpc: + # The external address of the host on which the JobManager runs and can be + # reached by the TaskManagers and any clients which want to connect. This setting + # is only used in Standalone mode and may be overwritten on the JobManager side + # by specifying the --host parameter of the bin/jobmanager.sh executable. + # In high availability mode, if you use the bin/start-cluster.sh script and setup + # the conf/masters file, this will be taken care of automatically. Yarn + # automatically configure the host name based on the hostname of the node where the + # JobManager runs. + address: localhost + # The RPC port where the JobManager is reachable. + port: 6123 + memory: + process: + # The total process memory size for the JobManager. + # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. + size: 1600m + execution: + # The failover strategy, i.e., how the job computation recovers from task failures. + # Only restart tasks that may have been affected by the task failure, which typically includes + # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. + failover-strategy: region + +taskmanager: + # The host interface the TaskManager will bind to. By default, this is localhost, and will prevent + # the TaskManager from communicating outside the machine/container it is running on. + # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. + # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. + # + # To enable this, set the bind-host address to one that has access to an outside facing network + # interface, such as 0.0.0.0. + bind-host: localhost + # The address of the host on which the TaskManager runs and can be reached by the JobManager and + # other TaskManagers. If not specified, the TaskManager will try different strategies to identify + # the address. + # + # Note this address needs to be reachable by the JobManager and forward traffic to one of + # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host'). + # + # Note also that unless all TaskManagers are running on the same machine, this address needs to be + # configured separately for each TaskManager. + host: localhost + # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + numberOfTaskSlots: 1 + memory: + process: + # The total process memory size for the TaskManager. + # + # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. + # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. + # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. + size: 1728m + +parallelism: + # The parallelism used for programs that did not specify and other parallelism. + default: 1 + +# # The default file system scheme and authority. +# # By default file paths without scheme are interpreted relative to the local +# # root file system 'file:///'. Use this to override the default and interpret +# # relative paths relative to a different file system, +# # for example 'hdfs://mynamenode:12345' +# fs: +# default-scheme: hdfs://mynamenode:12345 + +#============================================================================== +# High Availability +#============================================================================== + +# high-availability: +# # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# type: zookeeper +# # The path where metadata for master recovery is persisted. While ZooKeeper stores +# # the small ground truth for checkpoint and leader election, this location stores +# # the larger objects, like persisted dataflow graphs. +# # +# # Must be a durable file system that is accessible from all nodes +# # (like HDFS, S3, Ceph, nfs, ...) +# storageDir: hdfs:///flink/ha/ +# zookeeper: +# # The list of ZooKeeper quorum peers that coordinate the high-availability +# # setup. This must be a list of the form: +# # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) +# quorum: localhost:2181 +# client: +# # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# # The default value is "open" and it can be changed to "creator" if ZK security is enabled +# acl: open + +#============================================================================== +# Fault tolerance and checkpointing +#============================================================================== + +# The backend that will be used to store operator state checkpoints if +# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0. + +# # Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details. +# execution: +# checkpointing: +# interval: 3min +# externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION] +# max-concurrent-checkpoints: 1 +# min-pause: 0 +# mode: [EXACTLY_ONCE, AT_LEAST_ONCE] +# timeout: 10min +# tolerable-failed-checkpoints: 0 +# unaligned: false + +# state: +# backend: +# # Supported backends are 'hashmap', 'rocksdb', or the +# # . +# type: hashmap +# # Flag to enable/disable incremental checkpoints for backends that +# # support incremental checkpoints (like the RocksDB state backend). +# incremental: false +# checkpoints: +# # Directory for checkpoints filesystem, when using any of the default bundled +# # state backends. +# dir: hdfs://namenode-host:port/flink-checkpoints +# savepoints: +# # Default target directory for savepoints, optional. +# dir: hdfs://namenode-host:port/flink-savepoints + +#============================================================================== +# Rest & web frontend +#============================================================================== + +rest: + # The address to which the REST client will connect to + address: localhost + # The address that the REST & web server binds to + # By default, this is localhost, which prevents the REST & web server from + # being able to communicate outside of the machine/container it is running on. + # + # To enable this, set the bind address to one that has access to outside-facing + # network interface, such as 0.0.0.0. + bind-address: localhost + # # The port to which the REST client connects to. If rest.bind-port has + # # not been specified, then the server will bind to this port as well. + # port: 8081 + # # Port range for the REST and web server to bind to. + # bind-port: 8080-8090 + +# web: +# submit: +# # Flag to specify whether job submission is enabled from the web-based +# # runtime monitor. Uncomment to disable. +# enable: false +# cancel: +# # Flag to specify whether job cancellation is enabled from the web-based +# # runtime monitor. Uncomment to disable. +# enable: false + +#============================================================================== +# Advanced +#============================================================================== + +# io: +# tmp: +# # Override the directories for temporary files. If not specified, the +# # system-specific Java temporary directory (java.io.tmpdir property) is taken. +# # +# # For framework setups on Yarn, Flink will automatically pick up the +# # containers' temp directories without any need for configuration. +# # +# # Add a delimited list for multiple directories, using the system directory +# # delimiter (colon ':' on unix) or a comma, e.g.: +# # /data1/tmp:/data2/tmp:/data3/tmp +# # +# # Note: Each directory entry is read from and written to by a different I/O +# # thread. You can include the same directory multiple times in order to create +# # multiple I/O threads against that directory. This is for example relevant for +# # high-throughput RAIDs. +# dirs: /tmp + +# classloader: +# resolve: +# # The classloading resolve order. Possible values are 'child-first' (Flink's default) +# # and 'parent-first' (Java's default). +# # +# # Child first classloading allows users to use different dependency/library +# # versions in their application than those in the classpath. Switching back +# # to 'parent-first' may help with debugging dependency issues. +# order: child-first + +# The amount of memory going to the network stack. These numbers usually need +# no tuning. Adjusting them may be necessary in case of an "Insufficient number +# of network buffers" error. The default min is 64MB, the default max is 1GB. +# +# taskmanager: +# memory: +# network: +# fraction: 0.1 +# min: 64mb +# max: 1gb + +#============================================================================== +# Flink Cluster Security Configuration +#============================================================================== + +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL + +# # The below configure how Kerberos credentials are provided. A keytab will be used instead of +# # a ticket cache if the keytab path and principal are set. +# security: +# kerberos: +# login: +# use-ticket-cache: true +# keytab: /path/to/kerberos/keytab +# principal: flink-user +# # The configuration below defines which JAAS login contexts +# contexts: Client,KafkaClient + +#============================================================================== +# ZK Security Configuration +#============================================================================== + +# zookeeper: +# sasl: +# # Below configurations are applicable if ZK ensemble is configured for security +# # +# # Override below configuration to provide custom ZK service name if configured +# # zookeeper.sasl.service-name: zookeeper +# # +# # The configuration below must match one of the values set in "security.kerberos.login.contexts" +# login-context-name: Client + +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) +# +# jobmanager: +# archive: +# fs: +# # Directory to upload completed jobs to. Add this directory to the list of +# # monitored directories of the HistoryServer as well (see below). +# dir: hdfs:///completed-jobs/ + +# historyserver: +# web: +# # The address under which the web-based HistoryServer listens. +# address: 0.0.0.0 +# # The port under which the web-based HistoryServer listens. +# port: 8082 +# archive: +# fs: +# # Comma separated list of directories to monitor for completed jobs. +# dir: hdfs:///completed-jobs/ +# # Interval in milliseconds for refreshing the monitored directories. +# fs.refresh-interval: 10000 + diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml deleted file mode 100644 index b5aa2794dd9d5..0000000000000 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ /dev/null @@ -1,311 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# These parameters are required for Java 17 support. -# They can be safely removed when using Java 8/11. -env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED - -#============================================================================== -# Common -#============================================================================== - -# The external address of the host on which the JobManager runs and can be -# reached by the TaskManagers and any clients which want to connect. This setting -# is only used in Standalone mode and may be overwritten on the JobManager side -# by specifying the --host parameter of the bin/jobmanager.sh executable. -# In high availability mode, if you use the bin/start-cluster.sh script and setup -# the conf/masters file, this will be taken care of automatically. Yarn -# automatically configure the host name based on the hostname of the node where the -# JobManager runs. - -jobmanager.rpc.address: localhost - -# The RPC port where the JobManager is reachable. - -jobmanager.rpc.port: 6123 - -# The host interface the JobManager will bind to. By default, this is localhost, and will prevent -# the JobManager from communicating outside the machine/container it is running on. -# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. -# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. -# -# To enable this, set the bind-host address to one that has access to an outside facing network -# interface, such as 0.0.0.0. - -jobmanager.bind-host: localhost - - -# The total process memory size for the JobManager. -# -# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. - -jobmanager.memory.process.size: 1600m - -# The host interface the TaskManager will bind to. By default, this is localhost, and will prevent -# the TaskManager from communicating outside the machine/container it is running on. -# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. -# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. -# -# To enable this, set the bind-host address to one that has access to an outside facing network -# interface, such as 0.0.0.0. - -taskmanager.bind-host: localhost - -# The address of the host on which the TaskManager runs and can be reached by the JobManager and -# other TaskManagers. If not specified, the TaskManager will try different strategies to identify -# the address. -# -# Note this address needs to be reachable by the JobManager and forward traffic to one of -# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host'). -# -# Note also that unless all TaskManagers are running on the same machine, this address needs to be -# configured separately for each TaskManager. - -taskmanager.host: localhost - -# The total process memory size for the TaskManager. -# -# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. - -taskmanager.memory.process.size: 1728m - -# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. -# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. -# -# taskmanager.memory.flink.size: 1280m - -# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. - -taskmanager.numberOfTaskSlots: 1 - -# The parallelism used for programs that did not specify and other parallelism. - -parallelism.default: 1 - -# The default file system scheme and authority. -# -# By default file paths without scheme are interpreted relative to the local -# root file system 'file:///'. Use this to override the default and interpret -# relative paths relative to a different file system, -# for example 'hdfs://mynamenode:12345' -# -# fs.default-scheme - -#============================================================================== -# High Availability -#============================================================================== - -# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. -# -# high-availability.type: zookeeper - -# The path where metadata for master recovery is persisted. While ZooKeeper stores -# the small ground truth for checkpoint and leader election, this location stores -# the larger objects, like persisted dataflow graphs. -# -# Must be a durable file system that is accessible from all nodes -# (like HDFS, S3, Ceph, nfs, ...) -# -# high-availability.storageDir: hdfs:///flink/ha/ - -# The list of ZooKeeper quorum peers that coordinate the high-availability -# setup. This must be a list of the form: -# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) -# -# high-availability.zookeeper.quorum: localhost:2181 - - -# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes -# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) -# The default value is "open" and it can be changed to "creator" if ZK security is enabled -# -# high-availability.zookeeper.client.acl: open - -#============================================================================== -# Fault tolerance and checkpointing -#============================================================================== - -# The backend that will be used to store operator state checkpoints if -# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0. -# -# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details. -# -# execution.checkpointing.interval: 3min -# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION] -# execution.checkpointing.max-concurrent-checkpoints: 1 -# execution.checkpointing.min-pause: 0 -# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE] -# execution.checkpointing.timeout: 10min -# execution.checkpointing.tolerable-failed-checkpoints: 0 -# execution.checkpointing.unaligned: false -# -# Supported backends are 'hashmap', 'rocksdb', or the -# . -# -# state.backend.type: hashmap - -# Directory for checkpoints filesystem, when using any of the default bundled -# state backends. -# -# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints - -# Default target directory for savepoints, optional. -# -# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints - -# Flag to enable/disable incremental checkpoints for backends that -# support incremental checkpoints (like the RocksDB state backend). -# -# state.backend.incremental: false - -# The failover strategy, i.e., how the job computation recovers from task failures. -# Only restart tasks that may have been affected by the task failure, which typically includes -# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. - -jobmanager.execution.failover-strategy: region - -#============================================================================== -# Rest & web frontend -#============================================================================== - -# The port to which the REST client connects to. If rest.bind-port has -# not been specified, then the server will bind to this port as well. -# -#rest.port: 8081 - -# The address to which the REST client will connect to -# -rest.address: localhost - -# Port range for the REST and web server to bind to. -# -#rest.bind-port: 8080-8090 - -# The address that the REST & web server binds to -# By default, this is localhost, which prevents the REST & web server from -# being able to communicate outside of the machine/container it is running on. -# -# To enable this, set the bind address to one that has access to outside-facing -# network interface, such as 0.0.0.0. -# -rest.bind-address: localhost - -# Flag to specify whether job submission is enabled from the web-based -# runtime monitor. Uncomment to disable. - -#web.submit.enable: false - -# Flag to specify whether job cancellation is enabled from the web-based -# runtime monitor. Uncomment to disable. - -#web.cancel.enable: false - -#============================================================================== -# Advanced -#============================================================================== - -# Override the directories for temporary files. If not specified, the -# system-specific Java temporary directory (java.io.tmpdir property) is taken. -# -# For framework setups on Yarn, Flink will automatically pick up the -# containers' temp directories without any need for configuration. -# -# Add a delimited list for multiple directories, using the system directory -# delimiter (colon ':' on unix) or a comma, e.g.: -# /data1/tmp:/data2/tmp:/data3/tmp -# -# Note: Each directory entry is read from and written to by a different I/O -# thread. You can include the same directory multiple times in order to create -# multiple I/O threads against that directory. This is for example relevant for -# high-throughput RAIDs. -# -# io.tmp.dirs: /tmp - -# The classloading resolve order. Possible values are 'child-first' (Flink's default) -# and 'parent-first' (Java's default). -# -# Child first classloading allows users to use different dependency/library -# versions in their application than those in the classpath. Switching back -# to 'parent-first' may help with debugging dependency issues. -# -# classloader.resolve-order: child-first - -# The amount of memory going to the network stack. These numbers usually need -# no tuning. Adjusting them may be necessary in case of an "Insufficient number -# of network buffers" error. The default min is 64MB, the default max is 1GB. -# -# taskmanager.memory.network.fraction: 0.1 -# taskmanager.memory.network.min: 64mb -# taskmanager.memory.network.max: 1gb - -#============================================================================== -# Flink Cluster Security Configuration -#============================================================================== - -# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - -# may be enabled in four steps: -# 1. configure the local krb5.conf file -# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) -# 3. make the credentials available to various JAAS login contexts -# 4. configure the connector to use JAAS/SASL - -# The below configure how Kerberos credentials are provided. A keytab will be used instead of -# a ticket cache if the keytab path and principal are set. - -# security.kerberos.login.use-ticket-cache: true -# security.kerberos.login.keytab: /path/to/kerberos/keytab -# security.kerberos.login.principal: flink-user - -# The configuration below defines which JAAS login contexts - -# security.kerberos.login.contexts: Client,KafkaClient - -#============================================================================== -# ZK Security Configuration -#============================================================================== - -# Below configurations are applicable if ZK ensemble is configured for security - -# Override below configuration to provide custom ZK service name if configured -# zookeeper.sasl.service-name: zookeeper - -# The configuration below must match one of the values set in "security.kerberos.login.contexts" -# zookeeper.sasl.login-context-name: Client - -#============================================================================== -# HistoryServer -#============================================================================== - -# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) - -# Directory to upload completed jobs to. Add this directory to the list of -# monitored directories of the HistoryServer as well (see below). -#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ - -# The address under which the web-based HistoryServer listens. -#historyserver.web.address: 0.0.0.0 - -# The port under which the web-based HistoryServer listens. -#historyserver.web.port: 8082 - -# Comma separated list of directories to monitor for completed jobs. -#historyserver.archive.fs.dir: hdfs:///completed-jobs/ - -# Interval in milliseconds for refreshing the monitored directories. -#historyserver.archive.fs.refresh-interval: 10000 - diff --git a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java index 744ae1dccdd9e..df86e666a2003 100644 --- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java @@ -18,15 +18,24 @@ package org.apache.flink.dist; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.util.bash.BashJavaUtils; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.shaded.guava32.com.google.common.collect.Sets; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -49,6 +58,8 @@ class BashJavaUtilsITCase extends JavaBashTestBase { private static final String RUN_EXTRACT_LOGGING_OUTPUTS_SCRIPT = "src/test/bin/runExtractLoggingOutputs.sh"; + @TempDir private Path tmpDir; + @Test void testGetTmResourceParamsConfigs() throws Exception { int expectedResultLines = 2; @@ -122,7 +133,7 @@ void testGetJmResourceParamsWithDynamicProperties() throws Exception { @Test void testGetConfiguration() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -135,7 +146,7 @@ void testGetConfiguration() throws Exception { @Test void testGetConfigurationRemoveKey() throws Exception { - int expectedResultLines = 12; + int expectedResultLines = 24; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -146,12 +157,13 @@ void testGetConfigurationRemoveKey() throws Exception { List lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 1"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty(); } @Test void testGetConfigurationRemoveKeyValue() throws Exception { - int expectedResultLines = 12; + int expectedResultLines = 24; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -162,12 +174,13 @@ void testGetConfigurationRemoveKeyValue() throws Exception { List lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 1"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty(); } @Test void testGetConfigurationRemoveKeyValueNotMatchingValue() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -178,12 +191,13 @@ void testGetConfigurationRemoveKeyValueNotMatchingValue() throws Exception { List lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).contains("parallelism.default: 1"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1); } @Test void testGetConfigurationReplaceKeyValue() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -194,13 +208,13 @@ void testGetConfigurationReplaceKeyValue() throws Exception { List lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 1"); - assertThat(lines).contains("parallelism.default: 2"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(2); } @Test void testGetConfigurationReplaceKeyValueNotMatchingValue() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -211,7 +225,8 @@ void testGetConfigurationReplaceKeyValueNotMatchingValue() throws Exception { List lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 3"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1); } private static Map parseAndAssertDynamicParameters( @@ -274,4 +289,16 @@ void testExtractLoggingOutputs() throws Exception { assertThat(actualOutput).isEqualTo(expectedOutput); } + + private Configuration loadConfiguration(List lines) throws IOException { + File file = + TempDirUtils.newFile( + tmpDir.toAbsolutePath(), GlobalConfiguration.FLINK_CONF_FILENAME); + try (final PrintWriter pw = new PrintWriter(file)) { + for (String line : lines) { + pw.println(line); + } + } + return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString()); + } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index 5969f0850d9c7..92c9986e394fc 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -418,11 +418,9 @@ public void appendConfiguration(Configuration config) throws IOException { mergedConfig.addAll(defaultConfig); mergedConfig.addAll(config); - // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we - // need to use the legacy flink conf file 'flink-conf.yaml' here. Files.write( - conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME), - ConfigurationUtils.convertConfigToWritableLines(mergedConfig, true)); + conf.resolve(GlobalConfiguration.FLINK_CONF_FILENAME), + ConfigurationUtils.convertConfigToWritableLines(mergedConfig, false)); } public void setTaskExecutorHosts(Collection taskExecutorHosts) throws IOException { diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh old mode 100644 new mode 100755 index a7c4ce5a68a76..d3452b4b1bb20 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -50,6 +50,16 @@ TEST_INFRA_DIR=`pwd -P` cd $TEST_ROOT source "${TEST_INFRA_DIR}/common_utils.sh" +source "${FLINK_DIR}/bin/bash-java-utils.sh" + +if [[ -z "${FLINK_CONF_DIR:-}" ]]; then + FLINK_CONF_DIR="$FLINK_DIR/conf" +fi +FLINK_CONF=${FLINK_CONF_DIR}/config.yaml +# Flatten the configuration file config.yaml to enable end-to-end test cases which will modify +# it directly through shell scripts. +output=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" "${FLINK_DIR}/bin" "${FLINK_DIR}/lib" -flatten) +echo "$output" > $FLINK_CONF NODENAME=${NODENAME:-"localhost"} @@ -143,14 +153,14 @@ function swap_planner_scala_with_planner_loader() { function delete_config_key() { local config_key=$1 - sed -i -e "/^${config_key}: /d" ${FLINK_DIR}/conf/flink-conf.yaml + sed -i -e "/^${config_key}: /d" $FLINK_CONF } function set_config_key() { local config_key=$1 local value=$2 delete_config_key ${config_key} - echo "$config_key: $value" >> $FLINK_DIR/conf/flink-conf.yaml + echo "$config_key: $value" >> $FLINK_CONF } function create_ha_config() { @@ -166,7 +176,7 @@ function create_ha_config() { # This must have all the masters to be used in HA. echo "localhost:8081" > ${FLINK_DIR}/conf/masters - # then move on to create the flink-conf.yaml + # then move on to create the config.yaml #============================================================================== # Common #============================================================================== @@ -688,7 +698,7 @@ function setup_flink_slf4j_metric_reporter() { METRIC_NAME_PATTERN="${1:-"*"}" set_config_key "metrics.reporter.slf4j.factory.class" "org.apache.flink.metrics.slf4j.Slf4jReporterFactory" set_config_key "metrics.reporter.slf4j.interval" "1 SECONDS" - set_config_key "metrics.reporter.slf4j.filter.includes" "*:${METRIC_NAME_PATTERN}" + set_config_key "metrics.reporter.slf4j.filter.includes" "'*:${METRIC_NAME_PATTERN}'" } function get_job_exceptions { diff --git a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh index 97fcca09d5a2d..ec582c05f88dc 100755 --- a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh +++ b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh @@ -131,10 +131,10 @@ security.kerberos.login.principal: hadoop-user slot.request.timeout: 120000 END ) - docker exec master bash -c "echo \"${FLINK_CONFIG}\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" + docker exec master bash -c "echo \"${FLINK_CONFIG}\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" echo "Flink config:" - docker exec master bash -c "cat /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" + docker exec master bash -c "cat /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" } function debug_copy_and_show_logs { diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh index b9134ebcb4aea..a078e84da6b84 100755 --- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh +++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh @@ -67,9 +67,13 @@ on_exit test_clean_up cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf" -echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml" -echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml" -echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml" +# standard yaml do not allow duplicate keys +sed -i -e "/^taskmanager.memory.task.off-heap.size: /d" "${TEST_DATA_DIR}/conf/config.yaml" +sed -i -e "/^taskmanager.memory.process.size: /d" "${TEST_DATA_DIR}/conf/config.yaml" +sed -i -e "/^taskmanager.numberOfTaskSlots: /d" "${TEST_DATA_DIR}/conf/config.yaml" +echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/config.yaml" +echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/config.yaml" +echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/config.yaml" export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf" FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P` diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh index 1274a38851c9e..bb9e456a06d3c 100755 --- a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh +++ b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh @@ -81,7 +81,7 @@ if [[ ! "${YARN_APPLICATION_LOGS}" =~ "Receive initial delegation tokens from re fi echo "Running Job without configured keytab, the exception you see below is expected" -docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" +docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" # verify that it doesn't work if we don't configure a keytab docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \ /home/hadoop-user/${FLINK_DIRNAME}/bin/flink run-application \ diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh index f80e063522462..37e839103a9dc 100755 --- a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh +++ b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh @@ -66,7 +66,7 @@ else fi echo "Running Job without configured keytab, the exception you see below is expected" -docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" +docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" # verify that it doesn't work if we don't configure a keytab docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \ /home/hadoop-user/${FLINK_DIRNAME}/bin/flink run \ diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java index 31e89dacac4c0..badd86072d83c 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java @@ -112,7 +112,7 @@ public ParquetReader createReader(final Configuration config, final SplitT split final long splitLength = split.length(); // Using Flink FileSystem instead of Hadoop FileSystem directly, so we can get the hadoop - // config that create inputFile needed from flink-conf.yaml + // config that create inputFile needed from config.yaml final FileSystem fs = filePath.getFileSystem(); final ParquetInputFile inputFile = new ParquetInputFile(fs.open(filePath), fs.getFileStatus(filePath).getLen()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index d67893e4d21c2..3a12968663e10 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1564,7 +1564,7 @@ public SortPartitionOperator sortPartition(KeySelector keyExtractor * dataset.writeAsText("file:///path1"); } *
  • A directory is always created when fs.output.always-create-directory - * is set to true in flink-conf.yaml file, even when parallelism is set to 1. + * is set to true in config.yaml file, even when parallelism is set to 1. *
    {@code .
          * └── path1/
          *     └── 1 }
    diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 27382e41be45a..8165ed2e5d256 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -301,7 +301,7 @@ public class KubernetesConfigOptions { .stringType() .defaultValue("/opt/flink/conf") .withDescription( - "The flink conf directory that will be mounted in pod. The flink-conf.yaml, log4j.properties, " + "The flink conf directory that will be mounted in pod. The config.yaml, log4j.properties, " + "logback.xml in this path will be overwritten from config map."); public static final ConfigOption FLINK_LOG_DIR = diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index 2203770e2670d..3c3ee9a5154b7 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -64,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or + * Mounts the log4j.properties, logback.xml, and config.yaml configuration on the JobManager or * TaskManager pod. */ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java index c6b335449a885..21afbefc2875d 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java @@ -78,7 +78,7 @@ public interface KubernetesParameters { */ List> getTolerations(); - /** Directory in Pod that stores the flink-conf.yaml, log4j.properties, and the logback.xml. */ + /** Directory in Pod that stores the config.yaml, log4j.properties, and the logback.xml. */ String getFlinkConfDirInPod(); /** Directory in Pod that saves the log files. */ diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt index 2d74afd58c2de..f692a63f0f81b 100755 --- a/flink-python/dev/dev-requirements.txt +++ b/flink-python/dev/dev-requirements.txt @@ -32,3 +32,4 @@ pemja==0.4.1; platform_system != 'Windows' httplib2>=0.19.0 protobuf>=3.19.0 pytest~=7.0 +pyyaml>=6.0.1 diff --git a/flink-python/pyflink/common/configuration.py b/flink-python/pyflink/common/configuration.py index 54e82962f4321..35416b8fa471b 100644 --- a/flink-python/pyflink/common/configuration.py +++ b/flink-python/pyflink/common/configuration.py @@ -69,10 +69,21 @@ def set_string(self, key: str, value: str) -> 'Configuration': jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() if key in [jars_key, classpaths_key]: - add_jars_to_context_class_loader(value.split(";")) + jar_urls = Configuration.parse_jars_value(value, jvm) + add_jars_to_context_class_loader(jar_urls) self._j_configuration.setString(key, value) return self + @staticmethod + def parse_jars_value(value: str, jvm): + is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + if is_standard_yaml: + import yaml + jar_urls_list = yaml.safe_load(value) + if isinstance(jar_urls_list, list): + return jar_urls_list + return value.split(";") + def get_integer(self, key: str, default_value: int) -> int: """ Returns the value associated with the given key as an integer. diff --git a/flink-python/pyflink/common/restart_strategy.py b/flink-python/pyflink/common/restart_strategy.py index b1c4621bf3b1d..52962e138e64c 100644 --- a/flink-python/pyflink/common/restart_strategy.py +++ b/flink-python/pyflink/common/restart_strategy.py @@ -152,7 +152,7 @@ class FallbackRestartStrategyConfiguration(RestartStrategyConfiguration): """ Restart strategy configuration that could be used by jobs to use cluster level restart strategy. Useful especially when one has a custom implementation of restart strategy set via - flink-conf.yaml. + config.yaml. """ def __init__(self, j_restart_strategy=None): diff --git a/flink-python/pyflink/datastream/state_backend.py b/flink-python/pyflink/datastream/state_backend.py index 6ffe21912be9a..7b4c69917bbae 100644 --- a/flink-python/pyflink/datastream/state_backend.py +++ b/flink-python/pyflink/datastream/state_backend.py @@ -280,7 +280,7 @@ def set_predefined_options(self, options: 'PredefinedOptions'): Sets the predefined options for RocksDB. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the here specified predefined options and customized options. @@ -301,7 +301,7 @@ def get_predefined_options(self) -> 'PredefinedOptions': are :data:`PredefinedOptions.DEFAULT`. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the predefined and customized options. @@ -320,7 +320,7 @@ def set_options(self, options_factory_class_name: str): The options created by the factory here are applied on top of the pre-defined options profile selected via :func:`set_predefined_options` and user-configured - options from configuration set through flink-conf.yaml with keys in + options from configuration set through config.yaml with keys in ``RocksDBConfigurableOptions``. :param options_factory_class_name: The fully-qualified class name of the options @@ -383,7 +383,7 @@ class MemoryStateBackend(StateBackend): >> env.set_state_backend(HashMapStateBackend()) >> env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) - If you are configuring your state backend via the `flink-conf.yaml` please make the following + If you are configuring your state backend via the `config.yaml` please make the following changes. ``` @@ -535,7 +535,7 @@ class FsStateBackend(StateBackend): >> env.set_state_backend(HashMapStateBackend()) >> env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints") - If you are configuring your state backend via the `flink-conf.yaml` please set your state + If you are configuring your state backend via the `config.yaml` please set your state backend type to `hashmap`. This state backend holds the working state in the memory (JVM heap) of the TaskManagers. @@ -717,7 +717,7 @@ class RocksDBStateBackend(StateBackend): >> env.set_state_backend(EmbeddedRocksDBStateBackend()) >> env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints") - If you are configuring your state backend via the `flink-conf.yaml` no changes are required. + If you are configuring your state backend via the `config.yaml` no changes are required. A State Backend that stores its state in ``RocksDB``. This state backend can store very large state that exceeds memory and spills to disk. @@ -862,7 +862,7 @@ def set_predefined_options(self, options: 'PredefinedOptions'): Sets the predefined options for RocksDB. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the here specified predefined options and customized options. @@ -882,7 +882,7 @@ def get_predefined_options(self) -> 'PredefinedOptions': are :data:`PredefinedOptions.DEFAULT`. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the predefined and customized options. diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index d1b7ce4111853..20fcf85d7ed9f 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -842,7 +842,7 @@ def get_execution_environment(configuration: Configuration = None) \ method returns a local execution environment. When executed from the command line the given configuration is stacked on top of the - global configuration which comes from the flink-conf.yaml, potentially overriding + global configuration which comes from the config.yaml, potentially overriding duplicated options. :param configuration: The configuration to instantiate the environment with. diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py index dcb3f34884eb2..09b77231ef7b6 100644 --- a/flink-python/pyflink/pyflink_gateway_server.py +++ b/flink-python/pyflink/pyflink_gateway_server.py @@ -43,29 +43,43 @@ def on_windows(): return platform.system() == "Windows" -def read_from_config(key, default_value, flink_conf_file): - value = default_value - # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI - # using the tainted value and might allow an attacker to access, modify, or test the existence - # of critical or sensitive files. - with open(os.path.realpath(flink_conf_file), "r") as f: - while True: - line = f.readline() - if not line: - break - if line.startswith("#") or len(line.strip()) == 0: - continue - k, v = line.split(":", 1) - if k.strip() == key: - value = v.strip() - return value +def read_from_config(key, default_value, flink_conf_directory): + import yaml + # try to find flink-conf.yaml file in flink_conf_directory + flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml") + if os.path.isfile(flink_conf_file): + # If flink-conf.yaml exists, use the old parsing logic to read the value + # get the realpath of tainted path value to avoid CWE22 problem that constructs a path + # or URI using the tainted value and might allow an attacker to access, modify, or test + # the existence of critical or sensitive files. + with open(os.path.realpath(flink_conf_file), "r") as f: + while True: + line = f.readline() + if not line: + break + if line.startswith("#") or len(line.strip()) == 0: + continue + k, v = line.split(":", 1) + if k.strip() == key: + return v.strip() + else: + # If flink-conf.yaml does not exist, try to find config.yaml instead + config_file = os.path.join(flink_conf_directory, "config.yaml") + if os.path.isfile(config_file): + # If config.yaml exists, use YAML parser to read the value + with open(os.path.realpath(config_file), "r") as f: + config = yaml.safe_load(f) + return config.get(key, default_value) + + # If neither file exists, return the default value + return default_value def find_java_executable(): java_executable = "java.exe" if on_windows() else "java" flink_home = _find_flink_home() - flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml") - java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file) + flink_conf_directory = os.path.join(flink_home, "conf") + java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_directory) if java_home is None and "JAVA_HOME" in os.environ: java_home = os.environ["JAVA_HOME"] @@ -120,13 +134,12 @@ def construct_log_settings(env): flink_home = os.path.realpath(_find_flink_home()) flink_conf_dir = env['FLINK_CONF_DIR'] - flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") if "FLINK_LOG_DIR" in env: flink_log_dir = env["FLINK_LOG_DIR"] else: flink_log_dir = read_from_config( - KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file) + KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), env['FLINK_CONF_DIR']) if "LOG4J_PROPERTIES" in env: log4j_properties = env["LOG4J_PROPERTIES"] @@ -156,14 +169,13 @@ def construct_log_settings(env): def get_jvm_opts(env): - flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") jvm_opts = env.get("FLINK_ENV_JAVA_OPTS") if jvm_opts is None: - default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "", flink_conf_file) + default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "", env['FLINK_CONF_DIR']) extra_jvm_opts = read_from_config( KEY_ENV_JAVA_OPTS, - read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", flink_conf_file), - flink_conf_file) + read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", env['FLINK_CONF_DIR']), + env['FLINK_CONF_DIR']) jvm_opts = default_jvm_opts + " " + extra_jvm_opts # Remove leading and trailing double quotes (if present) of value @@ -194,8 +206,6 @@ def construct_flink_classpath(env): def construct_hadoop_classpath(env): - flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") - hadoop_conf_dir = "" if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env: if os.path.isdir("/etc/hadoop/conf"): @@ -212,11 +222,11 @@ def construct_hadoop_classpath(env): return os.pathsep.join( [env.get("HADOOP_CLASSPATH", ""), env.get("YARN_CONF_DIR", - read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)), + read_from_config(KEY_ENV_YARN_CONF_DIR, "", env['FLINK_CONF_DIR'])), env.get("HADOOP_CONF_DIR", - read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, flink_conf_file)), + read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, env['FLINK_CONF_DIR'])), env.get("HBASE_CONF_DIR", - read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, flink_conf_file))]) + read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, env['FLINK_CONF_DIR']))]) def construct_test_classpath(env): diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index bd12a447823bf..ba17767c7769d 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -36,7 +36,7 @@ class TableConfig(object): This class is a pure API class that abstracts configuration from various sources. Currently, configuration can be set in any of the following layers (in the given order): - - flink-conf.yaml + - config.yaml - CLI parameters - :class:`~pyflink.datastream.StreamExecutionEnvironment` when bridging to DataStream API - :func:`~EnvironmentSettings.Builder.with_configuration` @@ -106,8 +106,8 @@ def set(self, key: str, value: str) -> 'TableConfig': jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() if key in [jars_key, classpaths_key]: - add_jars_to_context_class_loader(value.split(";")) - + jar_urls = Configuration.parse_jars_value(value, jvm) + add_jars_to_context_class_loader(jar_urls) return self def get_local_timezone(self) -> str: diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 3327261ac41e7..ebfcc7cb095dc 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1540,23 +1540,33 @@ def _set_python_executable_for_local_executor(self): j_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), sys.executable) def _add_jars_to_j_env_config(self, config_key): - jvm = get_gateway().jvm jar_urls = self.get_config().get(config_key, None) - if jar_urls is not None: - # normalize + + if jar_urls: + jvm = get_gateway().jvm jar_urls_list = [] - for url in jar_urls.split(";"): - url = url.strip() - if url != "": - jar_urls_list.append(jvm.java.net.URL(url).toString()) + parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm) + url_strings = [ + jvm.java.net.URL(url).toString() if url else "" + for url in parsed_jar_urls + ] + self._parse_urls(url_strings, jar_urls_list) + j_configuration = get_j_env_configuration(self._get_j_env()) - if j_configuration.containsKey(config_key): - for url in j_configuration.getString(config_key, "").split(";"): - url = url.strip() - if url != "" and url not in jar_urls_list: - jar_urls_list.append(url) + parsed_jar_urls = Configuration.parse_jars_value( + j_configuration.getString(config_key, ""), + jvm + ) + self._parse_urls(parsed_jar_urls, jar_urls_list) + j_configuration.setString(config_key, ";".join(jar_urls_list)) + def _parse_urls(self, jar_urls, jar_urls_list): + for url in jar_urls: + url = url.strip() + if url != "" and url not in jar_urls_list: + jar_urls_list.append(url) + def _get_j_env(self): return self._j_tenv.getPlanner().getExecEnv() diff --git a/flink-python/setup.py b/flink-python/setup.py index 7161354be9f81..24637fe4c7d3a 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -325,7 +325,9 @@ def extracted_output_files(base_dir, file_path, output_directory): 'pandas>=1.3.0', 'pyarrow>=5.0.0', 'pemja==0.4.1;platform_system != "Windows"', - 'httplib2>=0.19.0', apache_flink_libraries_dependency] + 'httplib2>=0.19.0', + 'pyyaml>=6.0.1', + apache_flink_libraries_dependency] setup( name='apache-flink', diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java index d7959c425e9e6..c58267e6fc945 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java @@ -48,7 +48,7 @@ public class PythonConfig implements ReadableConfig { } /** - * Configuration adopted from the outer layer, e.g. flink-conf.yaml, command line arguments, + * Configuration adopted from the outer layer, e.g. config.yaml, command line arguments, * TableConfig, etc. */ private final ReadableConfig configuration; diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java index 17f425cebb885..142457bd19ccd 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java @@ -182,7 +182,7 @@ public class PythonOptions { .text("2. the command line option \"-pyclientexec\";") .linebreak() .text( - "3. the configuration 'python.client.executable' defined in flink-conf.yaml") + "3. the configuration 'python.client.executable' defined in config.yaml") .linebreak() .text("4. the environment variable PYFLINK_CLIENT_EXECUTABLE;") .build()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java index d0185c5f0ee85..8ea11a65ba31b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java @@ -46,8 +46,8 @@ private RestartBackoffTimeStrategyFactoryLoader() {} *
  • Strategy set within job graph, i.e. {@link * RestartStrategies.RestartStrategyConfiguration}, unless the config is {@link * RestartStrategies.FallbackRestartStrategyConfiguration}. - *
  • Strategy set in the cluster(server-side) config (flink-conf.yaml), unless the strategy - * is not specified + *
  • Strategy set in the cluster(server-side) config (config.yaml), unless the strategy is + * not specified *
  • {@link * FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory} if * checkpointing is enabled. Otherwise {@link diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java index ea4f007449b30..df4809714ef55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java @@ -79,7 +79,7 @@ public static Optional fromConfig( if (logger != null) { logger.debug( "The configuration {} has not be set in the current" - + " sessions flink-conf.yaml. Falling back to a default CheckpointStorage" + + " sessions config.yaml. Falling back to a default CheckpointStorage" + " type. Users are strongly encouraged explicitly set this configuration" + " so they understand how their applications are checkpointing" + " snapshots for fault-tolerance.", @@ -138,7 +138,7 @@ public static Optional fromConfig( * StreamExecutionEnvironment}. * *

    3) Use the {@link CheckpointStorage} instance configured via the clusters - * flink-conf.yaml. + * config.yaml. * *

    4) Load a default {@link CheckpointStorage} instance. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index aded04d7b6b2e..ec219432faae4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -67,7 +67,7 @@ * env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints"); * } * - *

    If you are configuring your state backend via the {@code flink-conf.yaml} please make the + *

    If you are configuring your state backend via the {@code config.yaml} please make the * following changes set your state backend type to "hashmap" {@code state.backend.type: hashmap}. * *

    This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 87d8d77803eb5..6345c2b4602b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -59,7 +59,7 @@ * env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); * } * - *

    If you are configuring your state backend via the {@code flink-conf.yaml} please make the + *

    If you are configuring your state backend via the {@code config.yaml} please make the * following changes: * *

    {@code
    diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
    index 048515b5b449c..6d15c53c1a287 100644
    --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
    +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
    @@ -698,7 +698,7 @@ public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateT
          * Sets the predefined options for RocksDB.
          *
          * 

    If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) or a user-defined options factory is set (via {@link + * config.yaml) or a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the here specified predefined options and customized options. * @@ -714,7 +714,7 @@ public void setPredefinedOptions(@Nonnull PredefinedOptions options) { * PredefinedOptions#DEFAULT}. * *

    If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) of a user-defined options factory is set (via {@link + * config.yaml) of a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the predefined and customized options. * diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java index 69576520fc387..e78d9fb0eb566 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java @@ -145,7 +145,7 @@ public class RocksDBOptions { + "This option only takes effect if neither '%s' nor '%s' are not configured. If none is configured " + "then each RocksDB column family state has its own memory caches (as controlled by the column " + "family options). " - + "The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (flink-conf.yaml)." + + "The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (config.yaml)." + "Note, that this feature breaks resource isolation between the slots", USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key())); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 0c37a1e6bc5a9..dd3fcb70cf4ec 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -61,8 +61,7 @@ * env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints"); * }

    * - *

    If you are configuring your state backend via the {@code flink-conf.yaml} no changes are - * required. + *

    If you are configuring your state backend via the {@code config.yaml} no changes are required. * *

    A State Backend that stores its state in {@code RocksDB}. This state backend can store very * large state that exceeds memory and spills to disk. @@ -399,7 +398,7 @@ public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateT * Sets the predefined options for RocksDB. * *

    If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) or a user-defined options factory is set (via {@link + * config.yaml) or a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the here specified predefined options and customized options. * @@ -415,7 +414,7 @@ public void setPredefinedOptions(@Nonnull PredefinedOptions options) { * PredefinedOptions#DEFAULT}. * *

    If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) of a user-defined options factory is set (via {@link + * config.yaml) of a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the predefined and customized options. * diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 82a42dfdb3cd8..34d27e1bb5915 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -249,7 +249,7 @@ public void testConfigureRocksDBPriorityQueueFactoryCacheSize() throws Exception env.close(); } - /** Validates that user custom configuration from code should override the flink-conf.yaml. */ + /** Validates that user custom configuration from code should override the config.yaml. */ @Test public void testConfigureTimerServiceLoadingFromApplication() throws Exception { final MockEnvironment env = new MockEnvironmentBuilder().build(); @@ -524,7 +524,7 @@ public void testPredefinedOptions() throws Exception { // verify that we would use PredefinedOptions.DEFAULT by default. assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); - // verify that user could configure predefined options via flink-conf.yaml + // verify that user could configure predefined options via config.yaml Configuration configuration = new Configuration(); configuration.set( RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED.name()); @@ -640,7 +640,7 @@ public void testOptionsFactory() throws Exception { String checkpointPath = tempFolder.newFolder().toURI().toString(); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); - // verify that user-defined options factory could be configured via flink-conf.yaml + // verify that user-defined options factory could be configured via config.yaml Configuration config = new Configuration(); config.setString(RocksDBOptions.OPTIONS_FACTORY.key(), TestOptionsFactory.class.getName()); config.setString(TestOptionsFactory.BACKGROUND_JOBS_OPTION.key(), "4"); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f14f3d4400a1c..b179ead220d9f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -2591,7 +2591,7 @@ public static StreamExecutionEnvironment getExecutionEnvironment() { * execution environment, as returned by {@link #createLocalEnvironment(Configuration)}. * *

    When executed from the command line the given configuration is stacked on top of the - * global configuration which comes from the {@code flink-conf.yaml}, potentially overriding + * global configuration which comes from the {@code config.yaml}, potentially overriding * duplicated options. * * @param configuration The configuration to instantiate the environment with. diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java index 551e05ef962f8..eae416cd0a628 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java @@ -127,8 +127,8 @@ private static CustomCommandLine findActiveCommandLine( // ------------------------------------------------------------------------------------------- /** - * Build the {@link DefaultContext} from flink-conf.yaml, dynamic configuration and users - * specified jars. + * Build the {@link DefaultContext} from config.yaml, dynamic configuration and users specified + * jars. * * @param dynamicConfig user specified configuration. * @param dependencies user specified jars diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java index 0c3e3ede7a624..65daeb4a3a819 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java @@ -43,7 +43,7 @@ public static Configuration getBaseConfig(Configuration flinkConf) { return rebuildRestEndpointOptions(context.getEndpointOptions()); } - /** Create the configuration generated from flink-conf.yaml. */ + /** Create the configuration generated from config.yaml. */ public static Configuration getFlinkConfig( String address, String bindAddress, String portRange) { final Configuration config = new Configuration(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java index e5d22ed943e60..8ca455d31b4fa 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java @@ -184,7 +184,7 @@ void testCreateContextWithListeners() { CatalogListener1.class.getName(), CatalogListener2.class.getName())); - // Find and create listeners from flink-conf.yaml for session + // Find and create listeners from config.yaml for session flinkConfig.set( TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS, Arrays.asList(CatalogFactory1.IDENTIFIER, CatalogFactory2.IDENTIFIER)); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java index f80ad6393ecfa..3d3ffe29cca3f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java @@ -50,7 +50,7 @@ * configuration can be set in any of the following layers (in the given order): * *

      - *
    1. {@code flink-conf.yaml}, + *
    2. {@code config.yaml}, *
    3. CLI parameters, *
    4. {@code StreamExecutionEnvironment} when bridging to DataStream API, *
    5. {@link EnvironmentSettings.Builder#withConfiguration(Configuration)} / {@link @@ -103,7 +103,7 @@ public TableConfig() {} // Note to implementers: // TableConfig is a ReadableConfig which is built once the TableEnvironment is created and - // contains both the configuration defined in the execution context (flink-conf.yaml + CLI + // contains both the configuration defined in the execution context (config.yaml + CLI // params), stored in rootConfiguration, but also any extra configuration defined by the user in // the application, which has precedence over the execution configuration. // diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java index cd2bee6b39c6b..f3c79c4b35dea 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java @@ -283,8 +283,8 @@ public Builder jarPaths(Collection jarPaths) { } /** - * Sets a single Flink configuration parameter (the options for flink-conf.yaml) and returns - * a reference to this Builder enabling method chaining. + * Sets a single Flink configuration parameter (the options for config.yaml) and returns a + * reference to this Builder enabling method chaining. * * @param The type parameter. * @param option The option. diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java index 38b542e1310cf..1ee0c9f6999ac 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java @@ -125,7 +125,7 @@ public FlinkImageBuilder setJavaVersion(String javaVersion) { } /** - * Sets Flink configuration. This configuration will be used for generating flink-conf.yaml for + * Sets Flink configuration. This configuration will be used for generating config.yaml for * configuring JobManager and TaskManager. */ public FlinkImageBuilder setConfiguration(Configuration conf) { @@ -209,12 +209,10 @@ public ImageFromDockerfile build() throws ImageBuildException { final Path flinkConfFile = createTemporaryFlinkConfFile(conf, tempDirectory); final Path log4jPropertiesFile = createTemporaryLog4jPropertiesFile(tempDirectory); - // Copy flink-conf.yaml into image - // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we - // need to use the legacy flink conf file 'flink-conf.yaml' here. + // Copy config.yaml into image filesToCopy.put( flinkConfFile, - Paths.get(flinkHome, "conf", GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME)); + Paths.get(flinkHome, "conf", GlobalConfiguration.FLINK_CONF_FILENAME)); filesToCopy.put( log4jPropertiesFile, Paths.get(flinkHome, "conf", LOG4J_PROPERTIES_FILENAME)); @@ -292,13 +290,10 @@ private String getJavaVersionSuffix() { private Path createTemporaryFlinkConfFile(Configuration finalConfiguration, Path tempDirectory) throws IOException { - // Create a temporary flink-conf.yaml file and write merged configurations into it - // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we - // need to use the legacy flink conf file 'flink-conf.yaml' here. - Path flinkConfFile = tempDirectory.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME); + Path flinkConfFile = tempDirectory.resolve(GlobalConfiguration.FLINK_CONF_FILENAME); Files.write( flinkConfFile, - ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, true)); + ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, false)); return flinkConfFile; } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index fcf32ea13be52..ec7c6905ee7de 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -821,10 +821,9 @@ private static void start( File flinkConfDirPath = TestUtils.findFile( - // NOTE: Before we change the default conf file in the flink-dist to - // 'config.yaml', we need to use the legacy flink conf file - // 'flink-conf.yaml' here. - flinkDistRootDir, new ContainsName(new String[] {"flink-conf.yaml"})); + flinkDistRootDir, + new ContainsName( + new String[] {GlobalConfiguration.FLINK_CONF_FILENAME})); assertThat(flinkConfDirPath).isNotNull(); final String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath(); @@ -839,7 +838,8 @@ private static void start( FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun); BootstrapTools.writeConfiguration( - globalConfiguration, new File(tempConfPathForSecureRun, "flink-conf.yaml")); + globalConfiguration, + new File(tempConfPathForSecureRun, GlobalConfiguration.FLINK_CONF_FILENAME)); String configDir = tempConfPathForSecureRun.getAbsolutePath(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 4f81077b16e81..5d9c77695e00e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -485,7 +485,7 @@ static ContainerLaunchContext createTaskExecutorContext( * * @param flinkConfig The Flink configuration. * @param tmParams Parameters for the task manager. - * @param configDirectory The configuration directory for the flink-conf.yaml + * @param configDirectory The configuration directory for the config.yaml * @param logDirectory The log directory. * @param hasLogback Uses logback? * @param hasLog4j Uses log4j?