Skip to content

Commit

Permalink
[GOBBLIN-1841] Add edge cases check
Browse files Browse the repository at this point in the history
  • Loading branch information
Peiyingy committed Jun 27, 2023
1 parent 7914810 commit 42390b6
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,7 @@ void initializeHelixManager() {
* @return a new GobblinHelixMultiManager
*/
public GobblinHelixMultiManager createMultiManager() {
return new GobblinHelixMultiManager(
this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus);
return new GobblinHelixMultiManager(this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus);
}

@VisibleForTesting
Expand Down Expand Up @@ -516,7 +515,6 @@ private static void printUsage(Options options) {
formatter.printHelp(GobblinClusterManager.class.getSimpleName(), options);
}


public static void main(String[] args) throws Exception {
Options options = buildOptions();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public static void disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiMan
Set<String> taskRunners = HelixUtils.getParticipants(helixDataAccessor,
GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX);
LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size());
for (String taskRunner: taskRunners) {
for (String taskRunner : taskRunners) {
LOGGER.warn("Disabling instance: {}", taskRunner);
helixAdmin.enableInstance(clusterName, taskRunner, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void runInternal() {
}
// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor,HELIX_YARN_INSTANCE_NAME_PREFIX);
Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);

// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.gobblin.yarn;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -17,6 +18,7 @@

import org.apache.gobblin.cluster.GobblinHelixMultiManager;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;


Expand All @@ -42,12 +44,21 @@ public void testDisableTaskRunnersFromPreviousExecutions() {
when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);

int instanceCount = 3;
Map<String, HelixProperty> mockChildValuesMap = new HashMap<>();

// GobblinYarnTaskRunner prefix would be disabled, while GobblinClusterManager prefix will not
ArrayList<String> gobblinYarnTaskRunnerPrefix = new ArrayList<String>();
ArrayList<String> gobblinClusterManagerPrefix = new ArrayList<String>();
for (int i = 0; i < instanceCount; i++) {
mockChildValuesMap.put("GobblinYarnTaskRunner_TestInstance_" + i, Mockito.mock(HelixProperty.class));
gobblinYarnTaskRunnerPrefix.add("GobblinYarnTaskRunner_TestInstance_" + i);
gobblinClusterManagerPrefix.add("GobblinClusterManager_TestInstance_" + i);
}

when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValuesMap);
Map<String, HelixProperty> mockChildValues = new HashMap<>();
for (int i = 0; i < instanceCount; i++) {
mockChildValues.put(gobblinYarnTaskRunnerPrefix.get(i), Mockito.mock(HelixProperty.class));
mockChildValues.put(gobblinClusterManagerPrefix.get(i), Mockito.mock(HelixProperty.class));
}
when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues);

ConfigAccessor mockConfigAccessor = Mockito.mock(ConfigAccessor.class);
when(mockHelixManager.getConfigAccessor()).thenReturn(mockConfigAccessor);
Expand All @@ -57,8 +68,9 @@ public void testDisableTaskRunnersFromPreviousExecutions() {

GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager);

for (String mockLiveInstance: mockChildValuesMap.keySet()) {
Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", mockLiveInstance, false);
for (int i = 0; i < instanceCount; i++) {
Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", gobblinYarnTaskRunnerPrefix.get(i), false);
Mockito.verify(mockHelixAdmin, times(0)).enableInstance("mockCluster", gobblinClusterManagerPrefix.get(i), false);
}
}
}

0 comments on commit 42390b6

Please sign in to comment.