From ba9c7afbb677f502a4470a602824446c24ef3260 Mon Sep 17 00:00:00 2001 From: xieyang Date: Sat, 8 Jun 2024 17:09:36 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90update=E3=80=91add=20config=20'job.sav?= =?UTF-8?q?epoint-create-subdir'=20to=20create=20sub-dir=20named=20by=20jo?= =?UTF-8?q?b=20id=20under=20the=20'state.savepoint.dir'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/FlinkOperatorConfiguration.java | 7 +- .../KubernetesOperatorConfigOptions.java | 7 + .../service/AbstractFlinkService.java | 135 ++++++++++-------- 3 files changed, 85 insertions(+), 64 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index 40a6dcb51..eaa818f03 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -72,6 +72,7 @@ public class FlinkOperatorConfiguration { LeaderElectionConfiguration leaderElectionConfiguration; DeletionPropagation deletionPropagation; boolean savepointOnDeletion; + boolean savepointCreateSubdir; public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) { Duration reconcileInterval = @@ -185,6 +186,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato boolean savepointOnDeletion = operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION); + boolean savepointCreateSubdir = + operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_CREATE_SUBDIR); + return new FlinkOperatorConfiguration( reconcileInterval, reconcilerMaxParallelism, @@ -212,7 +216,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato labelSelector, getLeaderElectionConfig(operatorConfig), deletionPropagation, - savepointOnDeletion); + savepointOnDeletion, + savepointCreateSubdir); } private static LeaderElectionConfiguration getLeaderElectionConfig(Configuration conf) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index d1fb44221..621e17d06 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -533,4 +533,11 @@ public static String operatorConfigKey(String key) { .defaultValue(false) .withDescription( "Indicate whether a savepoint must be taken when deleting a FlinkDeployment or FlinkSessionJob."); + @Documentation.Section(SECTION_DYNAMIC) + public static final ConfigOption SAVEPOINT_CREATE_SUBDIR = + operatorConfig("job.savepoint-create-subdir") + .booleanType() + .defaultValue(false) + .withDescription( + "Indicate whether to create sub-directories named by job id under the 'state.savepoints.dir'"); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 88cb42c54..53d86f963 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -82,11 +82,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.util.ConfigurationException; -import org.apache.flink.util.FileUtils; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.*; import io.fabric8.kubernetes.api.model.DeletionPropagation; import io.fabric8.kubernetes.api.model.ObjectMeta; @@ -135,6 +131,7 @@ public abstract class AbstractFlinkService implements FlinkService { private static final String EMPTY_JAR_FILENAME = "empty.jar"; public static final String FIELD_NAME_TOTAL_CPU = "total-cpu"; public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory"; + public static final String FORWARD_SLASH = "/"; protected final KubernetesClient kubernetesClient; protected final ExecutorService executorService; @@ -287,29 +284,33 @@ protected void cancelJob( deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, true); break; case SAVEPOINT: - final String savepointDirectory = - Preconditions.checkNotNull( - conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)); - final long timeout = - conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT) - .getSeconds(); if (ReconciliationUtils.isJobRunning(deploymentStatus)) { try { - LOG.info("Suspending job with savepoint."); - String savepoint = - clusterClient - .stopWithSavepoint( - Preconditions.checkNotNull(jobId), - false, - savepointDirectory, - conf.get(FLINK_VERSION) - .isNewerVersionThan( - FlinkVersion.v1_14) - ? savepointFormatType - : null) - .get(timeout, TimeUnit.SECONDS); - savepointOpt = Optional.of(savepoint); - LOG.info("Job successfully suspended with savepoint {}.", savepoint); + if (!StringUtils.isNullOrWhitespaceOnly(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY))){ + LOG.info("Suspending job with savepoint."); + final String jobIdSubdir = FORWARD_SLASH + jobId + FORWARD_SLASH; + final String savepointDirectory = operatorConfig.isSavepointCreateSubdir() ? + conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY) + jobIdSubdir : conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY); + final long timeout = + conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT) + .getSeconds(); + String savepoint = + clusterClient + .stopWithSavepoint( + Preconditions.checkNotNull(jobId), + false, + savepointDirectory, + conf.get(FLINK_VERSION) + .isNewerVersionThan( + FlinkVersion.v1_14) + ? savepointFormatType + : null) + .get(timeout, TimeUnit.SECONDS); + savepointOpt = Optional.of(savepoint); + LOG.info("Job successfully suspended with savepoint {}.", savepoint); + }else { + LOG.info("Job config 'state.savepoints.dir' is null, skipping cancel-with-savepoint"); + } } catch (TimeoutException exception) { throw new FlinkException( String.format( @@ -395,41 +396,45 @@ public void cancelSessionJob( break; case SAVEPOINT: if (ReconciliationUtils.isJobRunning(sessionJobStatus)) { - LOG.info("Suspending job with savepoint."); - final String savepointDirectory = - Preconditions.checkNotNull( - conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)); - final long timeout = - conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT) - .getSeconds(); - try { - String savepoint = - clusterClient - .stopWithSavepoint( - jobId, - false, - savepointDirectory, - conf.get(FLINK_VERSION) - .isNewerVersionThan( - FlinkVersion.v1_14) - ? conf.get( - KubernetesOperatorConfigOptions - .OPERATOR_SAVEPOINT_FORMAT_TYPE) - : null) - .get(timeout, TimeUnit.SECONDS); - savepointOpt = Optional.of(savepoint); - LOG.info( - "Job successfully suspended with savepoint {}.", savepoint); - } catch (TimeoutException exception) { - throw new FlinkException( - String.format( - "Timed out stopping the job %s in Flink cluster %s with savepoint, " - + "please configure a larger timeout via '%s'", - jobId, - clusterId, - ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT - .key()), - exception); + if(!StringUtils.isNullOrWhitespaceOnly(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY))){ + LOG.info("Suspending job with savepoint."); + final String jobIdSubdir = FORWARD_SLASH + jobId + FORWARD_SLASH; + final String savepointDirectory = operatorConfig.isSavepointCreateSubdir() ? + conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY) + jobIdSubdir : conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY); + final long timeout = + conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT) + .getSeconds(); + try { + String savepoint = + clusterClient + .stopWithSavepoint( + jobId, + false, + savepointDirectory, + conf.get(FLINK_VERSION) + .isNewerVersionThan( + FlinkVersion.v1_14) + ? conf.get( + KubernetesOperatorConfigOptions + .OPERATOR_SAVEPOINT_FORMAT_TYPE) + : null) + .get(timeout, TimeUnit.SECONDS); + savepointOpt = Optional.of(savepoint); + LOG.info( + "Job successfully suspended with savepoint {}.", savepoint); + } catch (TimeoutException exception) { + throw new FlinkException( + String.format( + "Timed out stopping the job %s in Flink cluster %s with savepoint, " + + "please configure a larger timeout via '%s'", + jobId, + clusterId, + ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT + .key()), + exception); + } + }else { + LOG.info("Job config 'state.savepoints.dir' is null, skipping cancel-with-savepoint"); } } else { throw new RuntimeException( @@ -467,8 +472,12 @@ public void triggerSavepoint( savepointTriggerHeaders.getUnresolvedMessageParameters(); savepointTriggerMessageParameters.jobID.resolve(JobID.fromHexString(jobId)); - var savepointDirectory = - Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)); + var jobIdSubdir = FORWARD_SLASH + jobId + FORWARD_SLASH; + var savepointDirectory = operatorConfig.isSavepointCreateSubdir() ? + Preconditions.checkNotNull( + conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)) + jobIdSubdir : + Preconditions.checkNotNull( + conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)); var timeout = operatorConfig.getFlinkClientTimeout().getSeconds(); var savepointFormatType = SavepointUtils.getSavepointFormatType(conf);