Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1841] Move disabling of current live instances to the GobblinClusterManager startup #3708

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,7 @@ public synchronized void start() {
LOGGER.info("Starting the Gobblin Cluster Manager");

this.eventBus.register(this);
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
setupHelix();

if (this.isStandaloneMode) {
// standalone mode starts non-daemon threads later, so need to have this thread to keep process up
Expand Down Expand Up @@ -316,6 +308,18 @@ public void run() {
this.started = true;
}

public synchronized void setupHelix() {
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
}

/**
* Stop the Gobblin Cluster Manager.
*/
Expand Down Expand Up @@ -427,11 +431,19 @@ boolean isHelixManagerConnected() {
*/
@VisibleForTesting
void initializeHelixManager() {
this.multiManager = new GobblinHelixMultiManager(
this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus) ;
this.multiManager = createMultiManager();
this.multiManager.addLeadershipChangeAwareComponent(this);
}

/***
* Can be overriden to inject mock GobblinHelixMultiManager
* @return a new GobblinHelixMultiManager
*/
public GobblinHelixMultiManager createMultiManager() {
return new GobblinHelixMultiManager(
this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus);
}

@VisibleForTesting
void sendShutdownRequest() {
Criteria criteria = new Criteria();
Expand Down Expand Up @@ -504,6 +516,7 @@ private static void printUsage(Options options) {
formatter.printHelp(GobblinClusterManager.class.getSimpleName(), options);
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove random whitespace. This will probably fail the linter

public static void main(String[] args) throws Exception {
Options options = buildOptions();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
Expand Down Expand Up @@ -448,6 +450,18 @@ public static List<String> getLiveInstances(HelixManager helixManager) {
return accessor.getChildNames(liveInstancesKey);
}

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contain AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
public static Set<String> getParticipants(HelixDataAccessor helixDataAccessor, String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
PropertyKey liveInstance = keyBuilder.liveInstances();
Map<String, HelixProperty> childValuesMap = helixDataAccessor.getChildValuesMap(liveInstance);
return childValuesMap.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
}

public static boolean isInstanceLive(HelixManager helixManager, String instanceName) {
HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
PropertyKey liveInstanceKey = accessor.keyBuilder().liveInstance(instanceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
Expand All @@ -32,6 +33,9 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
Expand All @@ -52,6 +56,8 @@
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinHelixMultiManager;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
Expand Down Expand Up @@ -135,6 +141,26 @@ protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
return new ControllerUserDefinedMessageHandlerFactory();
}

@Override
public synchronized void setupHelix() {
super.setupHelix();
this.disableTaskRunnersFromPreviousExecutions(this.multiManager);
}

public static void disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager) {
HelixManager helixManager = multiManager.getJobClusterHelixManager();
HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
String clusterName = helixManager.getClusterName();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
Set<String> taskRunners = HelixUtils.getParticipants(helixDataAccessor,
GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX);
LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size());
for (String taskRunner: taskRunners) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace before the :. But this will probably be caught by the linter

LOGGER.warn("Disabling instance: {}", taskRunner);
helixAdmin.enableInstance(clusterName, taskRunner, false);
}
}

/**
* A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.mail.EmailException;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -71,7 +69,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
Expand Down Expand Up @@ -117,8 +114,10 @@
import org.apache.gobblin.util.EmailUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;

Expand Down Expand Up @@ -371,7 +370,6 @@ public void launch() throws IOException, YarnException, InterruptedException {
this.applicationId = getReconnectableApplicationId();

if (!this.applicationId.isPresent()) {
disableLiveHelixInstances();
LOGGER.info("No reconnectable application found so submitting a new application");
this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
this.applicationId = Optional.of(setupAndSubmitApplication());
Expand Down Expand Up @@ -454,7 +452,6 @@ public synchronized void stop() throws IOException, TimeoutException {

if (!this.detachOnExitEnabled) {
LOGGER.info("Disabling all live Helix instances..");
disableLiveHelixInstances();
}

disconnectHelixManager();
Expand Down Expand Up @@ -540,26 +537,6 @@ void connectHelixManager() {
}
}

/**
* A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
* leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
* the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
* new cluster.
*
* NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
* completion, this method should be removed.
*/
Comment on lines -543 to -551
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this comment back to the new impl

void disableLiveHelixInstances() {
String clusterName = this.helixManager.getClusterName();
HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager);
LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size());
for (String instanceName: liveInstances) {
LOGGER.warn("Disabling instance: {}", instanceName);
helixAdmin.enableInstance(clusterName, instanceName, false);
}
}

@VisibleForTesting
void disconnectHelixManager() {
if (this.helixManager.isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


import org.apache.commons.compress.utils.Sets;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
Expand All @@ -56,6 +54,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;

Expand Down Expand Up @@ -184,17 +183,6 @@ public void run() {
}
}

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contains AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
private Set<String> getParticipants(String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
}

private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) {
if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better",
Expand Down Expand Up @@ -272,7 +260,7 @@ void runInternal() {
}
// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor,HELIX_YARN_INSTANCE_NAME_PREFIX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add whitespace after the comma. This will probably fail the linter


// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.gobblin.yarn;

import java.util.HashMap;
import java.util.Map;

import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ClusterConfig;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import junit.framework.TestCase;

import org.apache.gobblin.cluster.GobblinHelixMultiManager;

import static org.mockito.Mockito.when;


public class GobblinApplicationMasterTest extends TestCase {
@Test
public void testDisableTaskRunnersFromPreviousExecutions() {
GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class);

HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);

HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class);
when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin);
when(mockHelixManager.getClusterName()).thenReturn("mockCluster");

HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class);
when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor);

PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class);
when(mockAccessor.keyBuilder()).thenReturn(mockBuilder);

PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);

int instanceCount = 3;
Map<String, HelixProperty> mockChildValuesMap = new HashMap<>();
for (int i = 0; i < instanceCount; i++) {
mockChildValuesMap.put("GobblinYarnTaskRunner_TestInstance_" + i, Mockito.mock(HelixProperty.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also be adding non GobblinYarnTaskRunners as part of the cluster. E.g. GobblinClusterManager. Because we'd only want to disable those that start with GobblinYarnTaskRunner.

And then in your mockito verify, you should verify we never call disable on GobblinClusterManager

}

when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValuesMap);

ConfigAccessor mockConfigAccessor = Mockito.mock(ConfigAccessor.class);
when(mockHelixManager.getConfigAccessor()).thenReturn(mockConfigAccessor);

ClusterConfig mockClusterConfig = Mockito.mock(ClusterConfig.class);
when(mockConfigAccessor.getClusterConfig("GobblinApplicationMasterTest")).thenReturn(mockClusterConfig);

GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager);

for (String mockLiveInstance: mockChildValuesMap.keySet()) {
Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", mockLiveInstance, false);
}
}
}