Skip to content

Commit

Permalink
[INFINITY-1485] Fix update plan for HDFS (#972)
Browse files Browse the repository at this point in the history
* Add update type to update result

* Override deploy plan with update plan

* Add unit tests for plan overrider

* [HDFS] Make udpate plan strategy configurable

* Persist last completed deployment type

Also address PR comments

* Bump the timeout for health check

* Update properties test

* Address PR comments
  • Loading branch information
gabrielhartmann authored May 12, 2017
1 parent 387ec1d commit 715d27f
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 45 deletions.
23 changes: 19 additions & 4 deletions frameworks/hdfs/src/main/dist/svc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,39 @@ plans:
deploy:
strategy: serial
phases:
jn-deploy:
journal:
strategy: {{DEPLOY_STRATEGY}}
pod: journal
nn-deploy:
name:
strategy: serial
pod: name
steps:
- 0: [[format], [node]]
- 1: [[bootstrap], [node]]
zkfc-deploy:
zkfc:
strategy: serial
pod: name
steps:
- 0: [[zkfc-format], [zkfc]]
- 1: [[zkfc]]
dn-deploy:
data:
strategy: {{DEPLOY_STRATEGY}}
pod: data
update:
strategy: serial
phases:
journal:
strategy: {{UPDATE_STRATEGY}}
pod: journal
name:
strategy: {{UPDATE_STRATEGY}}
pod: name
steps:
- 0: [[node, zkfc]]
- 1: [[node, zkfc]]
data:
strategy: serial
pod: data

replace-nn:
strategy: serial
Expand Down
4 changes: 2 additions & 2 deletions frameworks/hdfs/tests/test_shakedown.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,6 @@ def find_java_home(host):


def check_healthy(count=DEFAULT_TASK_COUNT):
plan.wait_for_completed_deployment(PACKAGE_NAME)
plan.wait_for_completed_recovery(PACKAGE_NAME)
plan.wait_for_completed_deployment(PACKAGE_NAME, timeout_seconds=20 * 60)
plan.wait_for_completed_recovery(PACKAGE_NAME, timeout_seconds=20 * 60)
tasks.check_running(PACKAGE_NAME, count)
5 changes: 5 additions & 0 deletions frameworks/hdfs/universe/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
"type": "string",
"default": "parallel"
},
"update_strategy": {
"description": "HDFS update strategy. [parallel, serial]",
"type": "string",
"default": "serial"
},
"tls": {
"type": "object",
"description": "TLS configuration properties.",
Expand Down
1 change: 1 addition & 0 deletions frameworks/hdfs/universe/marathon.json.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"JRE_VERSION": "jre1.8.0_112",
"TASKCFG_ALL_JRE_VERSION": "jre1.8.0_112",
"DEPLOY_STRATEGY": "{{service.deploy_strategy}}",
"UPDATE_STRATEGY": "{{service.update_strategy}}",
"SERVICE_PRINCIPAL": "{{service.principal}}",
"JOURNAL_CPUS": "{{journal_node.cpus}}",
"JOURNAL_MEM": "{{journal_node.mem}}",
Expand Down
3 changes: 2 additions & 1 deletion frameworks/helloworld/tests/test_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ def check_for_nonempty_properties():

stdout = cmd.run_cli('hello-world state properties')
jsonobj = json.loads(stdout)
assert len(jsonobj) == 1
assert len(jsonobj) == 2
assert jsonobj[0] == "suppressed"
assert jsonobj[1] == "last-completed-update-type"

stdout = cmd.run_cli('hello-world state property suppressed')
assert stdout == "true\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,50 @@ public interface ConfigurationUpdater<C extends Configuration> {
* The result of an {@link ConfigurationUpdater#updateConfiguration(Configuration)} call.
*/
class UpdateResult {
private final UUID targetId;
private final DeploymentType deploymentType;
private final Collection<ConfigValidationError> errors;

public UpdateResult(
UUID targetId,
DeploymentType deploymentType,
Collection<ConfigValidationError> errors) {
this.targetId = targetId;
this.errors = errors;
this.deploymentType = deploymentType;
}

/**
* The resulting configuration ID which should be used by service tasks.
* Two types of deployments are differentiated by this type. Either a services is
* being deployed for the first time "DEPLOY", or it is updating a previously deployed
* version of the service, "UPDATE"
*/
public final UUID targetId;
public enum DeploymentType {
NONE,
DEPLOY,
UPDATE
}

/**
* A list of zero or more validation errors with the current configuration. If there were
* Gets the {@link DeploymentType} detected during this update.
*/
public DeploymentType getDeploymentType() {
return deploymentType;
}

/**
* Gets a list of zero or more validation errors with the current configuration. If there were
* errors, the {@link #targetId} will point to a previous valid configuration.
*/
public final Collection<ConfigValidationError> errors;
public Collection<ConfigValidationError> getErrors() {
return errors;
}

public UpdateResult(UUID targetId, Collection<ConfigValidationError> errors) {
this.targetId = targetId;
this.errors = errors;
/**
* Gets the resulting configuration ID which should be used by service tasks.
*/
public UUID getTargetId() {
return targetId;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.protobuf.TextFormat;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;

import com.mesosphere.sdk.config.validate.ConfigValidationError;
import com.mesosphere.sdk.config.validate.ConfigValidator;
import com.mesosphere.sdk.offer.TaskException;
Expand All @@ -16,10 +13,11 @@
import com.mesosphere.sdk.specification.PodSpec;
import com.mesosphere.sdk.specification.ServiceSpec;
import com.mesosphere.sdk.state.StateStore;
import com.mesosphere.sdk.state.StateStoreUtils;
import com.mesosphere.sdk.storage.StorageError.Reason;

import difflib.DiffUtils;

import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +35,7 @@ public class DefaultConfigurationUpdater implements ConfigurationUpdater<Service
private final ConfigStore<ServiceSpec> configStore;
private final ConfigurationComparator<ServiceSpec> configComparator;
private final Collection<ConfigValidator<ServiceSpec>> validators;
private final UpdateResult.DeploymentType lastUpdateType;

public DefaultConfigurationUpdater(
StateStore stateStore,
Expand All @@ -47,6 +46,7 @@ public DefaultConfigurationUpdater(
this.configStore = configStore;
this.configComparator = configComparator;
this.validators = validators;
this.lastUpdateType = StateStoreUtils.getLastCompletedUpdateType(stateStore);
}

@Override
Expand Down Expand Up @@ -133,7 +133,12 @@ public UpdateResult updateConfiguration(ServiceSpec candidateConfig) throws Conf
// leftover configs which are not the target and which are not referenced by any tasks.
cleanupDuplicateAndUnusedConfigs(targetConfig, targetConfigId);

return new ConfigurationUpdater.UpdateResult(targetConfigId, errors);
UpdateResult.DeploymentType updateType =
lastUpdateType.equals(UpdateResult.DeploymentType.NONE) ?
UpdateResult.DeploymentType.DEPLOY :
UpdateResult.DeploymentType.UPDATE;

return new ConfigurationUpdater.UpdateResult(targetConfigId, updateType, errors);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
public class Constants {

/** The name used for the task deployment plan. */
/** The name used for the deployment plan. */
public static final String DEPLOY_PLAN_NAME = "deploy";
/** The name used for the update plan. */
public static final String UPDATE_PLAN_NAME = "update";

/** The name used for reserved network port resources. */
public static final String PORTS_RESOURCE_TYPE = "ports";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class DefaultScheduler implements Scheduler, Observer {
protected final ConfigStore<ServiceSpec> configStore;
protected final Optional<RecoveryPlanManagerFactory> recoveryPlanManagerFactoryOptional;
private final Optional<ReplacementFailurePolicy> failurePolicyOptional;
private final ConfigurationUpdater.UpdateResult updateResult;

private JettyApiServer apiServer;
private Stopwatch apiServerStopwatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -333,9 +334,9 @@ public DefaultScheduler build() {
stateStore,
configStore,
configValidatorsOptional.orElse(defaultConfigValidators()));
if (!configUpdateResult.errors.isEmpty()) {
if (!configUpdateResult.getErrors().isEmpty()) {
LOGGER.warn("Failed to update configuration due to errors with configuration {}: {}",
configUpdateResult.targetId, configUpdateResult.errors);
configUpdateResult.getTargetId(), configUpdateResult.getErrors());
}

// Get or generate plans. Any plan generation is against the service spec that we just updated:
Expand Down Expand Up @@ -363,12 +364,13 @@ public DefaultScheduler build() {
}
}

plans = overrideDeployPlan(plans, configUpdateResult);
Optional<Plan> deployOptional = getDeployPlan(plans);
if (!deployOptional.isPresent()) {
throw new IllegalStateException("No deploy plan provided.");
}

List<String> errors = configUpdateResult.errors.stream()
List<String> errors = configUpdateResult.getErrors().stream()
.map(configValidationError -> configValidationError.toString())
.collect(Collectors.toList());
plans = updateDeployPlan(plans, errors);
Expand All @@ -381,13 +383,54 @@ public DefaultScheduler build() {
stateStore,
configStore,
new DefaultOfferRequirementProvider(
stateStore, serviceSpec.getName(), configUpdateResult.targetId, getSchedulerFlags()),
stateStore, serviceSpec.getName(), configUpdateResult.getTargetId(), getSchedulerFlags()),
endpointProducers,
restartHookOptional,
Optional.ofNullable(recoveryPlanManagerFactory));
Optional.ofNullable(recoveryPlanManagerFactory),
configUpdateResult);
}

/**
* Given the plans specified and the update scenario, the deploy plan may be overriden by a specified update
* plan.
*/
public static Collection<Plan> overrideDeployPlan(
Collection<Plan> plans,
ConfigurationUpdater.UpdateResult updateResult) {

Optional<Plan> updatePlanOptional = plans.stream()
.filter(plan -> plan.getName().equals(Constants.UPDATE_PLAN_NAME))
.findFirst();

LOGGER.info(String.format("Update type: '%s', Found update plan: '%s'",
updateResult.getDeploymentType().name(),
updatePlanOptional.isPresent()));

if (updateResult.getDeploymentType().equals(ConfigurationUpdater.UpdateResult.DeploymentType.UPDATE)
&& updatePlanOptional.isPresent()) {
LOGGER.info("Overriding deploy plan with update plan.");

Plan updatePlan = updatePlanOptional.get();
Plan deployPlan = new DefaultPlan(
Constants.DEPLOY_PLAN_NAME,
updatePlan.getChildren(),
updatePlan.getStrategy(),
Collections.emptyList());

plans = new ArrayList<>(
plans.stream()
.filter(plan -> !plan.getName().equals(Constants.DEPLOY_PLAN_NAME))
.filter(plan -> !plan.getName().equals(Constants.UPDATE_PLAN_NAME))
.collect(Collectors.toList()));

plans.add(deployPlan);
}

return plans;
}
}


/**
* Creates a new {@link Builder} based on the provided {@link ServiceSpec} describing the service, including
* details such as the service name, the pods/tasks to be deployed, and the plans describing how the deployment
Expand Down Expand Up @@ -532,7 +575,8 @@ protected DefaultScheduler(
OfferRequirementProvider offerRequirementProvider,
Map<String, EndpointProducer> customEndpointProducers,
Optional<RestartHook> restartHookOptional,
Optional<RecoveryPlanManagerFactory> recoveryPlanManagerFactoryOptional) {
Optional<RecoveryPlanManagerFactory> recoveryPlanManagerFactoryOptional,
ConfigurationUpdater.UpdateResult updateResult) {
this.serviceSpec = serviceSpec;
this.schedulerFlags = schedulerFlags;
this.resources = resources;
Expand All @@ -544,6 +588,7 @@ protected DefaultScheduler(
this.customRestartHook = restartHookOptional;
this.recoveryPlanManagerFactoryOptional = recoveryPlanManagerFactoryOptional;
this.failurePolicyOptional = serviceSpec.getReplacementFailurePolicy();
this.updateResult = updateResult;
}

public Collection<Object> getResources() throws InterruptedException {
Expand Down Expand Up @@ -720,6 +765,13 @@ private Optional<ResourceCleanerScheduler> getCleanerScheduler() {
public void update(Observable observable) {
if (observable == planCoordinator) {
suppressOrRevive();
completeDeploy();
}
}

private void completeDeploy() {
if (!planCoordinator.hasOperations()) {
StateStoreUtils.setLastCompletedUpdateType(stateStore, updateResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.TextFormat;
import com.mesosphere.sdk.config.ConfigStore;
import com.mesosphere.sdk.config.ConfigurationUpdater;
import com.mesosphere.sdk.offer.MesosResource;
import com.mesosphere.sdk.offer.TaskException;
import com.mesosphere.sdk.offer.TaskUtils;
Expand All @@ -10,7 +11,6 @@
import com.mesosphere.sdk.specification.ServiceSpec;
import com.mesosphere.sdk.specification.TaskSpec;
import com.mesosphere.sdk.storage.StorageError.Reason;

import org.apache.commons.lang3.StringUtils;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
Expand All @@ -19,6 +19,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;

Expand All @@ -29,6 +30,7 @@ public class StateStoreUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(StateStoreUtils.class);
private static final String SUPPRESSED_PROPERTY_KEY = "suppressed";
private static final String LAST_COMPLETED_UPDATE_TYPE_KEY = "last-completed-update-type";
private static final int MAX_VALUE_LENGTH_BYTES = 1024 * 1024; // 1MB

private StateStoreUtils() {
Expand Down Expand Up @@ -241,4 +243,30 @@ public static void setSuppressed(StateStore stateStore, boolean isSuppressed) {

stateStore.storeProperty(SUPPRESSED_PROPERTY_KEY, bytes);
}

/**
* Sets the last completed update type.
*/
public static void setLastCompletedUpdateType(
StateStore stateStore,
ConfigurationUpdater.UpdateResult updateResult) {
stateStore.storeProperty(
LAST_COMPLETED_UPDATE_TYPE_KEY,
updateResult.getDeploymentType().name().getBytes(StandardCharsets.UTF_8));
}

/**
* Gets the last completed update type.
*/
public static ConfigurationUpdater.UpdateResult.DeploymentType getLastCompletedUpdateType(StateStore stateStore) {
byte[] bytes = fetchPropertyOrEmptyArray(
stateStore,
LAST_COMPLETED_UPDATE_TYPE_KEY);
if (bytes.length == 0) {
return ConfigurationUpdater.UpdateResult.DeploymentType.NONE;
} else {
String value = new String(bytes, StandardCharsets.UTF_8);
return ConfigurationUpdater.UpdateResult.DeploymentType.valueOf(value);
}
}
}
Loading

0 comments on commit 715d27f

Please sign in to comment.