From 7914810355a66a06880fcf2a30e97b60f23c4bbe Mon Sep 17 00:00:00 2001 From: Peiyingy Date: Mon, 26 Jun 2023 13:54:30 -0700 Subject: [PATCH] [GOBBLIN-1841] Move disableTaskRunnersFromPreviousExecutions to GobblinApplicationMaster --- build.gradle | 4 - .../cluster/GobblinClusterManager.java | 2 - .../apache/gobblin/cluster/HelixUtils.java | 6 +- .../yarn/GobblinApplicationMaster.java | 11 +- .../gobblin/yarn/GobblinYarnAppLauncher.java | 5 +- .../gobblin/yarn/YarnAutoScalingManager.java | 15 +-- .../yarn/GobblinApplicationMasterTest.java | 101 ++---------------- .../GobblinApplicationMasterTest.conf | 23 ---- 8 files changed, 22 insertions(+), 145 deletions(-) delete mode 100644 gobblin-yarn/src/test/resources/GobblinApplicationMasterTest.conf diff --git a/build.gradle b/build.gradle index f87b3348c4f..532de4d0d67 100644 --- a/build.gradle +++ b/build.gradle @@ -19,10 +19,6 @@ apply from: 'gradle/scripts/environment.gradle' buildscript { - configurations.all { - exclude group: 'org.slf4j', module: 'slf4j-log4j12' - } - apply from: 'gradle/scripts/repositories.gradle' apply from: 'gradle/scripts/defaultBuildProperties.gradle' apply from: 'gradle/scripts/computeVersions.gradle' diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index a8e895e906a..b26611065d9 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.Properties; -import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.helix.Criteria; -import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory; diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 62b0128b0b4..5614b7146e9 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -39,7 +39,6 @@ import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.TargetState; @@ -453,11 +452,10 @@ public static List getLiveInstances(HelixManager helixManager) { /** * Getting all instances (Helix Participants) in cluster at this moment. - * Note that the raw result could contains AppMaster node and replanner node. + * Note that the raw result could contain AppMaster node and replanner node. * @param filterString Helix instances whose name containing fitlerString will pass filtering. */ - public static Set getParticipants(HelixManager helixManager, String filterString) { - HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); + public static Set getParticipants(HelixDataAccessor helixDataAccessor, String filterString) { PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); PropertyKey liveInstance = keyBuilder.liveInstances(); Map childValuesMap = helixDataAccessor.getChildValuesMap(liveInstance); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java index 11c77095d8c..55f066c84aa 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; import org.apache.helix.messaging.handling.HelixTaskResult; @@ -55,6 +56,7 @@ import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.GobblinClusterManager; import org.apache.gobblin.cluster.GobblinClusterUtils; +import org.apache.gobblin.cluster.GobblinHelixMultiManager; import org.apache.gobblin.cluster.HelixUtils; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JvmUtils; @@ -142,14 +144,15 @@ protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() { @Override public synchronized void setupHelix() { super.setupHelix(); - this.disableTaskRunnersFromPreviousExecutions(); + this.disableTaskRunnersFromPreviousExecutions(this.multiManager); } - public void disableTaskRunnersFromPreviousExecutions() { - HelixManager helixManager = this.multiManager.getJobClusterHelixManager(); + public static void disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager) { + HelixManager helixManager = multiManager.getJobClusterHelixManager(); + HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); String clusterName = helixManager.getClusterName(); HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); - Set taskRunners = HelixUtils.getParticipants(helixManager, + Set taskRunners = HelixUtils.getParticipants(helixDataAccessor, GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX); LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size()); for (String taskRunner: taskRunners) { diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 16732595cbe..48ac8947972 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -38,8 +38,6 @@ import org.apache.avro.Schema; import org.apache.commons.io.FileUtils; import org.apache.commons.mail.EmailException; -import org.apache.gobblin.util.hadoop.TokenUtils; -import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -71,7 +69,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; import org.apache.helix.Criteria; -import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -117,8 +114,10 @@ import org.apache.gobblin.util.EmailUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.JvmUtils; +import org.apache.gobblin.util.hadoop.TokenUtils; import org.apache.gobblin.util.io.StreamUtils; import org.apache.gobblin.util.logs.LogCopier; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent; import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent; diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index 4303d98dbb6..5abc0f7e911 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -30,12 +30,10 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import org.apache.commons.compress.utils.Sets; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobDag; @@ -185,17 +183,6 @@ public void run() { } } - /** - * Getting all instances (Helix Participants) in cluster at this moment. - * Note that the raw result could contains AppMaster node and replanner node. - * @param filterString Helix instances whose name containing fitlerString will pass filtering. - */ - private Set getParticipants(String filterString) { - PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); - return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances()) - .keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet()); - } - private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) { if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) { log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better", @@ -273,7 +260,7 @@ void runInternal() { } // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager // and potentially replanner-instance. - Set allParticipants = HelixUtils.getParticipants(helixManager,HELIX_YARN_INSTANCE_NAME_PREFIX); + Set allParticipants = HelixUtils.getParticipants(helixDataAccessor,HELIX_YARN_INSTANCE_NAME_PREFIX); // Find all joined participants not in-use for this round of inspection. // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1. diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java index a92ce9132a9..80d28fad696 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java @@ -1,73 +1,29 @@ package org.apache.gobblin.yarn; -import java.net.URL; +import java.util.HashMap; import java.util.Map; -import org.apache.commons.cli.Options; -import org.apache.curator.test.TestingServer; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; import org.apache.helix.HelixProperty; -import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.model.ClusterConfig; import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; - import junit.framework.TestCase; -import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; -import org.apache.gobblin.cluster.GobblinClusterManager; -import org.apache.gobblin.cluster.GobblinClusterManagerTest; -import org.apache.gobblin.cluster.GobblinHelixConstants; import org.apache.gobblin.cluster.GobblinHelixMultiManager; -import org.apache.gobblin.cluster.HelixUtils; -import org.apache.gobblin.cluster.TestHelper; -import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory; -import org.apache.gobblin.util.JvmUtils; -import org.apache.gobblin.util.logs.Log4jConfigurationHelper; import static org.mockito.Mockito.when; public class GobblinApplicationMasterTest extends TestCase { - - GobblinHelixMultiManager mockMultiManager; - - public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop"; - public final static Logger LOG = LoggerFactory.getLogger(GobblinClusterManagerTest.class); - private TestingServer testingZKServer; - - - @BeforeClass - public void setUp() throws Exception { - // Use a random ZK port - this.testingZKServer = new TestingServer(-1); - LOG.info("Testing ZK Server listening on: " + testingZKServer.getConnectString()); - } - @Test - public void testDisableTaskRunnersFromPreviousExecutions() throws Exception { - this.mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class); + public void testDisableTaskRunnersFromPreviousExecutions() { + GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class); HelixManager mockHelixManager = Mockito.mock(HelixManager.class); when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager); @@ -85,11 +41,12 @@ public void testDisableTaskRunnersFromPreviousExecutions() throws Exception { PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class); when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey); - Map mockChildValuesMap = ImmutableMap.of( - "GobblinYarnTaskRunner_TestInstance_0", Mockito.mock(HelixProperty.class), - "GobblinYarnTaskRunner_TestInstance_1", Mockito.mock(HelixProperty.class), - "GobblinYarnTaskRunner_TestInstance_2", Mockito.mock(HelixProperty.class) - ); + int instanceCount = 3; + Map mockChildValuesMap = new HashMap<>(); + for (int i = 0; i < instanceCount; i++) { + mockChildValuesMap.put("GobblinYarnTaskRunner_TestInstance_" + i, Mockito.mock(HelixProperty.class)); + } + when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValuesMap); ConfigAccessor mockConfigAccessor = Mockito.mock(ConfigAccessor.class); @@ -98,48 +55,10 @@ public void testDisableTaskRunnersFromPreviousExecutions() throws Exception { ClusterConfig mockClusterConfig = Mockito.mock(ClusterConfig.class); when(mockConfigAccessor.getClusterConfig("GobblinApplicationMasterTest")).thenReturn(mockClusterConfig); - URL url = GobblinApplicationMasterTest.class.getClassLoader().getResource( - GobblinApplicationMasterTest.class.getSimpleName() + ".conf"); - Assert.assertNotNull(url, "Could not find resource " + url); - - Config config = ConfigFactory.parseURL(url) - .withValue("gobblin.yarn.zk.connection.string", - ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString())) - .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY, - ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10")) - .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME, - ConfigValueFactory.fromAnyRef("value")) - .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache", - ConfigValueFactory.fromAnyRef("true")) - .resolve(); - - ContainerId mockContainerId = Mockito.mock(ContainerId.class); - - TestGobblinApplicationMaster testGobblinApplicationMaster = new TestGobblinApplicationMaster( - GobblinApplicationMasterTest.class.getSimpleName(), - TestHelper.TEST_APPLICATION_ID, - mockContainerId, - config, new YarnConfiguration()); - - testGobblinApplicationMaster.start(); + GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager); for (String mockLiveInstance: mockChildValuesMap.keySet()) { Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", mockLiveInstance, false); } } - - public class TestGobblinApplicationMaster extends GobblinApplicationMaster { - - public TestGobblinApplicationMaster(String applicationName, String applicationId, ContainerId containerId, - Config config, YarnConfiguration yarnConfiguration) - throws Exception { - super(applicationName, applicationId, containerId, config, yarnConfiguration); - } - - @Override - public GobblinHelixMultiManager createMultiManager() { - return mockMultiManager; - } - } - } \ No newline at end of file diff --git a/gobblin-yarn/src/test/resources/GobblinApplicationMasterTest.conf b/gobblin-yarn/src/test/resources/GobblinApplicationMasterTest.conf deleted file mode 100644 index ae4bd248890..00000000000 --- a/gobblin-yarn/src/test/resources/GobblinApplicationMasterTest.conf +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -include "reference" - -# Cluster / Helix configuration properties -gobblin.yarn.helix.cluster.name=GobblinApplicationMasterTest -gobblin.yarn.work.dir=GobblinApplicationMasterTest -gobblin.yarn.zk.connection.string="localhost:3085"