Skip to content

Commit

Permalink
[GOBBLIN-1841] Move disableTaskRunnersFromPreviousExecutions to Gobbl…
Browse files Browse the repository at this point in the history
…inApplicationMaster
  • Loading branch information
Peiyingy committed Jun 26, 2023
1 parent 778b29f commit 7914810
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 145 deletions.
4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
apply from: 'gradle/scripts/environment.gradle'

buildscript {
configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}

apply from: 'gradle/scripts/repositories.gradle'
apply from: 'gradle/scripts/defaultBuildProperties.gradle'
apply from: 'gradle/scripts/computeVersions.gradle'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -36,7 +35,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TargetState;
Expand Down Expand Up @@ -453,11 +452,10 @@ public static List<String> getLiveInstances(HelixManager helixManager) {

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contains AppMaster node and replanner node.
* Note that the raw result could contain AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
public static Set<String> getParticipants(HelixManager helixManager, String filterString) {
HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
public static Set<String> getParticipants(HelixDataAccessor helixDataAccessor, String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
PropertyKey liveInstance = keyBuilder.liveInstances();
Map<String, HelixProperty> childValuesMap = helixDataAccessor.getChildValuesMap(liveInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
Expand All @@ -55,6 +56,7 @@
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinHelixMultiManager;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
Expand Down Expand Up @@ -142,14 +144,15 @@ protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
@Override
public synchronized void setupHelix() {
super.setupHelix();
this.disableTaskRunnersFromPreviousExecutions();
this.disableTaskRunnersFromPreviousExecutions(this.multiManager);
}

public void disableTaskRunnersFromPreviousExecutions() {
HelixManager helixManager = this.multiManager.getJobClusterHelixManager();
public static void disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager) {
HelixManager helixManager = multiManager.getJobClusterHelixManager();
HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
String clusterName = helixManager.getClusterName();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
Set<String> taskRunners = HelixUtils.getParticipants(helixManager,
Set<String> taskRunners = HelixUtils.getParticipants(helixDataAccessor,
GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX);
LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size());
for (String taskRunner: taskRunners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.mail.EmailException;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -71,7 +69,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
Expand Down Expand Up @@ -117,8 +114,10 @@
import org.apache.gobblin.util.EmailUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


import org.apache.commons.compress.utils.Sets;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
Expand Down Expand Up @@ -185,17 +183,6 @@ public void run() {
}
}

/**
* Getting all instances (Helix Participants) in cluster at this moment.
* Note that the raw result could contains AppMaster node and replanner node.
* @param filterString Helix instances whose name containing fitlerString will pass filtering.
*/
private Set<String> getParticipants(String filterString) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
}

private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) {
if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better",
Expand Down Expand Up @@ -273,7 +260,7 @@ void runInternal() {
}
// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = HelixUtils.getParticipants(helixManager,HELIX_YARN_INSTANCE_NAME_PREFIX);
Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor,HELIX_YARN_INSTANCE_NAME_PREFIX);

// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,73 +1,29 @@
package org.apache.gobblin.yarn;

import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.cli.Options;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ClusterConfig;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import junit.framework.TestCase;

import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterManagerTest;
import org.apache.gobblin.cluster.GobblinHelixConstants;
import org.apache.gobblin.cluster.GobblinHelixMultiManager;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.TestHelper;
import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;

import static org.mockito.Mockito.when;


public class GobblinApplicationMasterTest extends TestCase {

GobblinHelixMultiManager mockMultiManager;

public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
public final static Logger LOG = LoggerFactory.getLogger(GobblinClusterManagerTest.class);
private TestingServer testingZKServer;


@BeforeClass
public void setUp() throws Exception {
// Use a random ZK port
this.testingZKServer = new TestingServer(-1);
LOG.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
}

@Test
public void testDisableTaskRunnersFromPreviousExecutions() throws Exception {
this.mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class);
public void testDisableTaskRunnersFromPreviousExecutions() {
GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class);

HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);
Expand All @@ -85,11 +41,12 @@ public void testDisableTaskRunnersFromPreviousExecutions() throws Exception {
PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);

Map<String, HelixProperty> mockChildValuesMap = ImmutableMap.of(
"GobblinYarnTaskRunner_TestInstance_0", Mockito.mock(HelixProperty.class),
"GobblinYarnTaskRunner_TestInstance_1", Mockito.mock(HelixProperty.class),
"GobblinYarnTaskRunner_TestInstance_2", Mockito.mock(HelixProperty.class)
);
int instanceCount = 3;
Map<String, HelixProperty> mockChildValuesMap = new HashMap<>();
for (int i = 0; i < instanceCount; i++) {
mockChildValuesMap.put("GobblinYarnTaskRunner_TestInstance_" + i, Mockito.mock(HelixProperty.class));
}

when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValuesMap);

ConfigAccessor mockConfigAccessor = Mockito.mock(ConfigAccessor.class);
Expand All @@ -98,48 +55,10 @@ public void testDisableTaskRunnersFromPreviousExecutions() throws Exception {
ClusterConfig mockClusterConfig = Mockito.mock(ClusterConfig.class);
when(mockConfigAccessor.getClusterConfig("GobblinApplicationMasterTest")).thenReturn(mockClusterConfig);

URL url = GobblinApplicationMasterTest.class.getClassLoader().getResource(
GobblinApplicationMasterTest.class.getSimpleName() + ".conf");
Assert.assertNotNull(url, "Could not find resource " + url);

Config config = ConfigFactory.parseURL(url)
.withValue("gobblin.yarn.zk.connection.string",
ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
.withValue(GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY,
ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10"))
.withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME,
ConfigValueFactory.fromAnyRef("value"))
.withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache",
ConfigValueFactory.fromAnyRef("true"))
.resolve();

ContainerId mockContainerId = Mockito.mock(ContainerId.class);

TestGobblinApplicationMaster testGobblinApplicationMaster = new TestGobblinApplicationMaster(
GobblinApplicationMasterTest.class.getSimpleName(),
TestHelper.TEST_APPLICATION_ID,
mockContainerId,
config, new YarnConfiguration());

testGobblinApplicationMaster.start();
GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager);

for (String mockLiveInstance: mockChildValuesMap.keySet()) {
Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", mockLiveInstance, false);
}
}

public class TestGobblinApplicationMaster extends GobblinApplicationMaster {

public TestGobblinApplicationMaster(String applicationName, String applicationId, ContainerId containerId,
Config config, YarnConfiguration yarnConfiguration)
throws Exception {
super(applicationName, applicationId, containerId, config, yarnConfiguration);
}

@Override
public GobblinHelixMultiManager createMultiManager() {
return mockMultiManager;
}
}

}
23 changes: 0 additions & 23 deletions gobblin-yarn/src/test/resources/GobblinApplicationMasterTest.conf

This file was deleted.

0 comments on commit 7914810

Please sign in to comment.