Skip to content

Commit

Permalink
[GOBBLIN-1841] Move disabling of current live instances to the Gobbli…
Browse files Browse the repository at this point in the history
…nClusterManager startup (#3708)

* [GOBBLIN-1841] Implement disableLiveHelixInstances and unit test

* [GOBBLIN-1841] clear commit history

* [GOBBLIN-1841] remove disableLiveHelixInstances from Yarn

* [GOBBLIN-1841] remove extra comments

* [GOBBLIN-1841] Implement TestGobblinClusterManager class

* [GOBBLIN-1841] Remove unnecessary imports

* [GOBBLIN-1841] Optimize imports

* [GOBBLIN-1841] Move disableTaskRunnersFromPreviousExecutions to GobblinApplicationMaster

* [GOBBLIN-1841] Add edge cases check

* [GOBBLIN-1840] Fix checkstyle error

* [GOBBLIN-1841] Fix GobblinYarnAppLauncherTest testJobCleanup

* [GOBBLIN-1841] Add back javadoc to disableTaskRunnersFromPreviousExecutions implementation
  • Loading branch information
Peiyingy authored Jun 30, 2023
1 parent 1ecce5b commit 3d14b86
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,7 @@ public synchronized void start() {
LOGGER.info("Starting the Gobblin Cluster Manager");

this.eventBus.register(this);
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
setupHelix();

if (this.isStandaloneMode) {
// standalone mode starts non-daemon threads later, so need to have this thread to keep process up
Expand Down Expand Up @@ -316,6 +308,18 @@ public void run() {
this.started = true;
}

public synchronized void setupHelix() {
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
}

/**
* Stop the Gobblin Cluster Manager.
*/
Expand Down Expand Up @@ -427,11 +431,18 @@ boolean isHelixManagerConnected() {
*/
@VisibleForTesting
void initializeHelixManager() {
this.multiManager = new GobblinHelixMultiManager(
this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus) ;
this.multiManager = createMultiManager();
this.multiManager.addLeadershipChangeAwareComponent(this);
}

/***
* Can be overriden to inject mock GobblinHelixMultiManager
* @return a new GobblinHelixMultiManager
*/
public GobblinHelixMultiManager createMultiManager() {
return new GobblinHelixMultiManager(this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus);
}

@VisibleForTesting
void sendShutdownRequest() {
Criteria criteria = new Criteria();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
Expand Down Expand Up @@ -452,6 +454,18 @@ public static List<String> getLiveInstances(HelixManager helixManager) {
return accessor.getChildNames(liveInstancesKey);
}

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contain AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
public static Set<String> getParticipants(HelixDataAccessor helixDataAccessor, String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
PropertyKey liveInstance = keyBuilder.liveInstances();
Map<String, HelixProperty> childValuesMap = helixDataAccessor.getChildValuesMap(liveInstance);
return childValuesMap.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
}

public static boolean isInstanceLive(HelixManager helixManager, String instanceName) {
HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
PropertyKey liveInstanceKey = accessor.keyBuilder().liveInstance(instanceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
Expand All @@ -32,6 +33,9 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
Expand All @@ -52,6 +56,8 @@
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinHelixMultiManager;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
Expand Down Expand Up @@ -135,6 +141,35 @@ protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
return new ControllerUserDefinedMessageHandlerFactory();
}

@Override
public synchronized void setupHelix() {
super.setupHelix();
this.disableTaskRunnersFromPreviousExecutions(this.multiManager);
}

/**
* A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
* leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
* the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
* new cluster.
*
* NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
* completion, this method should be removed.
*/
public static void disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager) {
HelixManager helixManager = multiManager.getJobClusterHelixManager();
HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
String clusterName = helixManager.getClusterName();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
Set<String> taskRunners = HelixUtils.getParticipants(helixDataAccessor,
GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX);
LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size());
for (String taskRunner : taskRunners) {
LOGGER.warn("Disabling instance: {}", taskRunner);
helixAdmin.enableInstance(clusterName, taskRunner, false);
}
}

/**
* A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.mail.EmailException;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -71,7 +69,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
Expand Down Expand Up @@ -117,8 +114,10 @@
import org.apache.gobblin.util.EmailUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;

Expand Down Expand Up @@ -371,7 +370,6 @@ public void launch() throws IOException, YarnException, InterruptedException {
this.applicationId = getReconnectableApplicationId();

if (!this.applicationId.isPresent()) {
disableLiveHelixInstances();
LOGGER.info("No reconnectable application found so submitting a new application");
this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
this.applicationId = Optional.of(setupAndSubmitApplication());
Expand Down Expand Up @@ -454,7 +452,6 @@ public synchronized void stop() throws IOException, TimeoutException {

if (!this.detachOnExitEnabled) {
LOGGER.info("Disabling all live Helix instances..");
disableLiveHelixInstances();
}

disconnectHelixManager();
Expand Down Expand Up @@ -540,26 +537,6 @@ void connectHelixManager() {
}
}

/**
* A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
* leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
* the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
* new cluster.
*
* NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
* completion, this method should be removed.
*/
void disableLiveHelixInstances() {
String clusterName = this.helixManager.getClusterName();
HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager);
LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size());
for (String instanceName: liveInstances) {
LOGGER.warn("Disabling instance: {}", instanceName);
helixAdmin.enableInstance(clusterName, instanceName, false);
}
}

@VisibleForTesting
void disconnectHelixManager() {
if (this.helixManager.isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
Expand All @@ -56,6 +55,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;

Expand Down Expand Up @@ -184,17 +184,6 @@ public void run() {
}
}

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contains AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
private Set<String> getParticipants(String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
}

private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) {
if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better",
Expand Down Expand Up @@ -275,7 +264,7 @@ void runInternal() {
}
// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);

// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.yarn;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import junit.framework.TestCase;

import org.apache.gobblin.cluster.GobblinHelixMultiManager;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;


public class GobblinApplicationMasterTest extends TestCase {
@Test
public void testDisableTaskRunnersFromPreviousExecutions() {
GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class);

HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);

HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class);
when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin);
when(mockHelixManager.getClusterName()).thenReturn("mockCluster");

HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class);
when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor);

PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class);
when(mockAccessor.keyBuilder()).thenReturn(mockBuilder);

PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);

int instanceCount = 3;

// GobblinYarnTaskRunner prefix would be disabled, while GobblinClusterManager prefix will not
ArrayList<String> gobblinYarnTaskRunnerPrefix = new ArrayList<String>();
ArrayList<String> gobblinClusterManagerPrefix = new ArrayList<String>();
for (int i = 0; i < instanceCount; i++) {
gobblinYarnTaskRunnerPrefix.add("GobblinYarnTaskRunner_TestInstance_" + i);
gobblinClusterManagerPrefix.add("GobblinClusterManager_TestInstance_" + i);
}

Map<String, HelixProperty> mockChildValues = new HashMap<>();
for (int i = 0; i < instanceCount; i++) {
mockChildValues.put(gobblinYarnTaskRunnerPrefix.get(i), Mockito.mock(HelixProperty.class));
mockChildValues.put(gobblinClusterManagerPrefix.get(i), Mockito.mock(HelixProperty.class));
}
when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues);

GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager);

for (int i = 0; i < instanceCount; i++) {
Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", gobblinYarnTaskRunnerPrefix.get(i), false);
Mockito.verify(mockHelixAdmin, times(0)).enableInstance("mockCluster", gobblinClusterManagerPrefix.get(i), false);
}
}
}
Loading

0 comments on commit 3d14b86

Please sign in to comment.