Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1841] Move disabling of current live instances to the GobblinClusterManager startup #3708

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,7 @@ public synchronized void start() {
LOGGER.info("Starting the Gobblin Cluster Manager");

this.eventBus.register(this);
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
setupHelix();

if (this.isStandaloneMode) {
// standalone mode starts non-daemon threads later, so need to have this thread to keep process up
Expand Down Expand Up @@ -316,6 +308,18 @@ public void run() {
this.started = true;
}

public synchronized void setupHelix() {
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
}

/**
* Stop the Gobblin Cluster Manager.
*/
Expand Down Expand Up @@ -427,11 +431,18 @@ boolean isHelixManagerConnected() {
*/
@VisibleForTesting
void initializeHelixManager() {
this.multiManager = new GobblinHelixMultiManager(
this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus) ;
this.multiManager = createMultiManager();
this.multiManager.addLeadershipChangeAwareComponent(this);
}

/***
* Can be overriden to inject mock GobblinHelixMultiManager
* @return a new GobblinHelixMultiManager
*/
public GobblinHelixMultiManager createMultiManager() {
return new GobblinHelixMultiManager(this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus);
}

@VisibleForTesting
void sendShutdownRequest() {
Criteria criteria = new Criteria();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
Expand Down Expand Up @@ -448,6 +450,18 @@ public static List<String> getLiveInstances(HelixManager helixManager) {
return accessor.getChildNames(liveInstancesKey);
}

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contain AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
public static Set<String> getParticipants(HelixDataAccessor helixDataAccessor, String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
PropertyKey liveInstance = keyBuilder.liveInstances();
Map<String, HelixProperty> childValuesMap = helixDataAccessor.getChildValuesMap(liveInstance);
return childValuesMap.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
}

public static boolean isInstanceLive(HelixManager helixManager, String instanceName) {
HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
PropertyKey liveInstanceKey = accessor.keyBuilder().liveInstance(instanceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
Expand All @@ -32,6 +33,9 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
Expand All @@ -52,6 +56,8 @@
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinHelixMultiManager;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
Expand Down Expand Up @@ -135,6 +141,26 @@ protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
return new ControllerUserDefinedMessageHandlerFactory();
}

@Override
public synchronized void setupHelix() {
super.setupHelix();
this.disableTaskRunnersFromPreviousExecutions(this.multiManager);
}

public static void disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager) {
HelixManager helixManager = multiManager.getJobClusterHelixManager();
HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
String clusterName = helixManager.getClusterName();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
Set<String> taskRunners = HelixUtils.getParticipants(helixDataAccessor,
GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX);
LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size());
for (String taskRunner : taskRunners) {
LOGGER.warn("Disabling instance: {}", taskRunner);
helixAdmin.enableInstance(clusterName, taskRunner, false);
}
}

/**
* A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.mail.EmailException;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -71,7 +69,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
Expand Down Expand Up @@ -117,8 +114,10 @@
import org.apache.gobblin.util.EmailUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;

Expand Down Expand Up @@ -371,7 +370,6 @@ public void launch() throws IOException, YarnException, InterruptedException {
this.applicationId = getReconnectableApplicationId();

if (!this.applicationId.isPresent()) {
disableLiveHelixInstances();
LOGGER.info("No reconnectable application found so submitting a new application");
this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
this.applicationId = Optional.of(setupAndSubmitApplication());
Expand Down Expand Up @@ -454,7 +452,6 @@ public synchronized void stop() throws IOException, TimeoutException {

if (!this.detachOnExitEnabled) {
LOGGER.info("Disabling all live Helix instances..");
disableLiveHelixInstances();
}

disconnectHelixManager();
Expand Down Expand Up @@ -540,26 +537,6 @@ void connectHelixManager() {
}
}

/**
* A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
* leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
* the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
* new cluster.
*
* NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
* completion, this method should be removed.
*/
Comment on lines -543 to -551
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this comment back to the new impl

void disableLiveHelixInstances() {
String clusterName = this.helixManager.getClusterName();
HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager);
LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size());
for (String instanceName: liveInstances) {
LOGGER.warn("Disabling instance: {}", instanceName);
helixAdmin.enableInstance(clusterName, instanceName, false);
}
}

@VisibleForTesting
void disconnectHelixManager() {
if (this.helixManager.isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


import org.apache.commons.compress.utils.Sets;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
Expand All @@ -56,6 +54,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;

Expand Down Expand Up @@ -184,17 +183,6 @@ public void run() {
}
}

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contains AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
private Set<String> getParticipants(String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
}

private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) {
if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better",
Expand Down Expand Up @@ -272,7 +260,7 @@ void runInternal() {
}
// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);

// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.yarn;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import junit.framework.TestCase;

import org.apache.gobblin.cluster.GobblinHelixMultiManager;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;


public class GobblinApplicationMasterTest extends TestCase {
@Test
public void testDisableTaskRunnersFromPreviousExecutions() {
GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class);

HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);

HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class);
when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin);
when(mockHelixManager.getClusterName()).thenReturn("mockCluster");

HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class);
when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor);

PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class);
when(mockAccessor.keyBuilder()).thenReturn(mockBuilder);

PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);

int instanceCount = 3;

// GobblinYarnTaskRunner prefix would be disabled, while GobblinClusterManager prefix will not
ArrayList<String> gobblinYarnTaskRunnerPrefix = new ArrayList<String>();
ArrayList<String> gobblinClusterManagerPrefix = new ArrayList<String>();
for (int i = 0; i < instanceCount; i++) {
gobblinYarnTaskRunnerPrefix.add("GobblinYarnTaskRunner_TestInstance_" + i);
gobblinClusterManagerPrefix.add("GobblinClusterManager_TestInstance_" + i);
}

Map<String, HelixProperty> mockChildValues = new HashMap<>();
for (int i = 0; i < instanceCount; i++) {
mockChildValues.put(gobblinYarnTaskRunnerPrefix.get(i), Mockito.mock(HelixProperty.class));
mockChildValues.put(gobblinClusterManagerPrefix.get(i), Mockito.mock(HelixProperty.class));
}
when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues);

GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager);

for (int i = 0; i < instanceCount; i++) {
Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", gobblinYarnTaskRunnerPrefix.get(i), false);
Mockito.verify(mockHelixAdmin, times(0)).enableInstance("mockCluster", gobblinClusterManagerPrefix.get(i), false);
}
}
}
Loading