Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【update】Add new operator config "savepoint-create-subdir" to allow user to configure whether to create sub-directories named by job id under the 'state.savepoints.dir' #882

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class FlinkOperatorConfiguration {
LeaderElectionConfiguration leaderElectionConfiguration;
DeletionPropagation deletionPropagation;
boolean savepointOnDeletion;
boolean savepointCreateSubdir;

public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
Expand Down Expand Up @@ -185,6 +186,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
boolean savepointOnDeletion =
operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION);

boolean savepointCreateSubdir =
operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_CREATE_SUBDIR);

return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
Expand Down Expand Up @@ -212,7 +216,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
labelSelector,
getLeaderElectionConfig(operatorConfig),
deletionPropagation,
savepointOnDeletion);
savepointOnDeletion,
savepointCreateSubdir);
}

private static LeaderElectionConfiguration getLeaderElectionConfig(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,11 @@ public static String operatorConfigKey(String key) {
.defaultValue(false)
.withDescription(
"Indicate whether a savepoint must be taken when deleting a FlinkDeployment or FlinkSessionJob.");
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> SAVEPOINT_CREATE_SUBDIR =
operatorConfig("job.savepoint-create-subdir")
.booleanType()
.defaultValue(false)
.withDescription(
"Indicate whether to create sub-directories named by job id under the 'state.savepoints.dir'");
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.*;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
Expand Down Expand Up @@ -135,6 +131,7 @@ public abstract class AbstractFlinkService implements FlinkService {
private static final String EMPTY_JAR_FILENAME = "empty.jar";
public static final String FIELD_NAME_TOTAL_CPU = "total-cpu";
public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory";
public static final String FORWARD_SLASH = "/";

protected final KubernetesClient kubernetesClient;
protected final ExecutorService executorService;
Expand Down Expand Up @@ -287,29 +284,33 @@ protected void cancelJob(
deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, true);
break;
case SAVEPOINT:
final String savepointDirectory =
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
try {
LOG.info("Suspending job with savepoint.");
String savepoint =
clusterClient
.stopWithSavepoint(
Preconditions.checkNotNull(jobId),
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? savepointFormatType
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info("Job successfully suspended with savepoint {}.", savepoint);
if (!StringUtils.isNullOrWhitespaceOnly(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY))){
LOG.info("Suspending job with savepoint.");
final String jobIdSubdir = FORWARD_SLASH + jobId + FORWARD_SLASH;
final String savepointDirectory = operatorConfig.isSavepointCreateSubdir() ?
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY) + jobIdSubdir : conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY);
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
String savepoint =
clusterClient
.stopWithSavepoint(
Preconditions.checkNotNull(jobId),
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? savepointFormatType
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info("Job successfully suspended with savepoint {}.", savepoint);
}else {
LOG.info("Job config 'state.savepoints.dir' is null, skipping cancel-with-savepoint");
}
} catch (TimeoutException exception) {
throw new FlinkException(
String.format(
Expand Down Expand Up @@ -395,41 +396,45 @@ public void cancelSessionJob(
break;
case SAVEPOINT:
if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {
LOG.info("Suspending job with savepoint.");
final String savepointDirectory =
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
try {
String savepoint =
clusterClient
.stopWithSavepoint(
jobId,
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? conf.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info(
"Job successfully suspended with savepoint {}.", savepoint);
} catch (TimeoutException exception) {
throw new FlinkException(
String.format(
"Timed out stopping the job %s in Flink cluster %s with savepoint, "
+ "please configure a larger timeout via '%s'",
jobId,
clusterId,
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
.key()),
exception);
if(!StringUtils.isNullOrWhitespaceOnly(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY))){
LOG.info("Suspending job with savepoint.");
final String jobIdSubdir = FORWARD_SLASH + jobId + FORWARD_SLASH;
final String savepointDirectory = operatorConfig.isSavepointCreateSubdir() ?
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY) + jobIdSubdir : conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY);
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
try {
String savepoint =
clusterClient
.stopWithSavepoint(
jobId,
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? conf.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info(
"Job successfully suspended with savepoint {}.", savepoint);
} catch (TimeoutException exception) {
throw new FlinkException(
String.format(
"Timed out stopping the job %s in Flink cluster %s with savepoint, "
+ "please configure a larger timeout via '%s'",
jobId,
clusterId,
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
.key()),
exception);
}
}else {
LOG.info("Job config 'state.savepoints.dir' is null, skipping cancel-with-savepoint");
}
} else {
throw new RuntimeException(
Expand Down Expand Up @@ -467,8 +472,12 @@ public void triggerSavepoint(
savepointTriggerHeaders.getUnresolvedMessageParameters();
savepointTriggerMessageParameters.jobID.resolve(JobID.fromHexString(jobId));

var savepointDirectory =
Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
var jobIdSubdir = FORWARD_SLASH + jobId + FORWARD_SLASH;
var savepointDirectory = operatorConfig.isSavepointCreateSubdir() ?
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)) + jobIdSubdir :
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
var timeout = operatorConfig.getFlinkClientTimeout().getSeconds();

var savepointFormatType = SavepointUtils.getSavepointFormatType(conf);
Expand Down