diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index e6683cfd383..a2f4d8372fc 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import org.apache.commons.compress.utils.Sets; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.helix.HelixDataAccessor; @@ -39,6 +38,7 @@ import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobDag; +import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; @@ -220,16 +220,19 @@ void runInternal() { YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle(); for (Map.Entry workFlowEntry : taskDriver.getWorkflows().entrySet()) { WorkflowContext workflowContext = taskDriver.getWorkflowContext(workFlowEntry.getKey()); + WorkflowConfig workflowConfig = workFlowEntry.getValue(); - // Only allocate for active workflows - if (workflowContext == null || !workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) { + // Only allocate for active workflows. Those marked for deletion are ignored but the existing containers won't + // be released until maxIdleTimeInMinutesBeforeScalingDown + if (workflowContext == null || + TargetState.DELETE.equals(workflowConfig.getTargetState()) || + !workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) { continue; } log.debug("Workflow name {} config {} context {}", workFlowEntry.getKey(), workFlowEntry.getValue(), workflowContext); - WorkflowConfig workflowConfig = workFlowEntry.getValue(); JobDag jobDag = workflowConfig.getJobDag(); Set jobs = jobDag.getAllNodes(); diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java index 687af96fd83..10563c2bbef 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java @@ -17,10 +17,11 @@ package org.apache.gobblin.yarn; -import java.io.IOException; -import java.util.HashMap; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.helix.HelixDataAccessor; @@ -29,6 +30,7 @@ import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobDag; +import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskState; import org.apache.helix.task.WorkflowConfig; @@ -63,34 +65,16 @@ public class YarnAutoScalingManagerTest { * Test for one workflow with one job */ @Test - public void testOneJob() throws IOException { + public void testOneJob() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class); - JobDag mockJobDag = mock(JobDag.class); + WorkflowConfig mockWorkflowConfig = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); + Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1")); - Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2)); - Mockito.when(mockTaskDriver.getWorkflows()) - .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - - WorkflowContext mockWorkflowContext = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext); - - JobContext mockJobContext = mock(JobContext.class); - Mockito.when(mockJobContext.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext); - - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""))); + HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList("GobblinClusterManager", "GobblinYarnTaskRunner-1")); YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, @@ -109,40 +93,18 @@ public void testOneJob() throws IOException { * Test for one workflow with two jobs */ @Test - public void testTwoJobs() throws IOException { + public void testTwoJobs() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class); - JobDag mockJobDag = mock(JobDag.class); - - Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1", "job2")); - Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag); - + WorkflowConfig mockWorkflowConfig = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); Mockito.when(mockTaskDriver.getWorkflows()) .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - WorkflowContext mockWorkflowContext = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2)); + getJobContext(mockTaskDriver, ImmutableMap.of(3, "GobblinYarnTaskRunner-2"), "job2"); - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext); - - JobContext mockJobContext1 = mock(JobContext.class); - Mockito.when(mockJobContext1.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1); - - JobContext mockJobContext2 = mock(JobContext.class); - Mockito.when(mockJobContext2.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(3))); - Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2"); - Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2); - - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""), - "GobblinYarnTaskRunner-2", new HelixProperty(""))); + HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2")); YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, @@ -162,59 +124,24 @@ public void testTwoJobs() throws IOException { * Test for two workflows */ @Test - public void testTwoWorkflows() throws IOException { + public void testTwoWorkflows() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class); - JobDag mockJobDag1 = mock(JobDag.class); - - Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", "job2")); - Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1); - - WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1); - - JobContext mockJobContext1 = mock(JobContext.class); - Mockito.when(mockJobContext1.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1); - - JobContext mockJobContext2 = mock(JobContext.class); - Mockito.when(mockJobContext2.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(3))); - Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2"); - Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2); - - WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class); - JobDag mockJobDag2 = mock(JobDag.class); - - Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3")); - Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2); - - WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2); - - JobContext mockJobContext3 = mock(JobContext.class); - Mockito.when(mockJobContext3.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5))); - Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3"); - Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3); + WorkflowConfig mockWorkflowConfig1 = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); + WorkflowConfig mockWorkflowConfig2 = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job3"), TaskState.IN_PROGRESS, TargetState.START, "workflow2"); Mockito.when(mockTaskDriver.getWorkflows()) .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2)); - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""), - "GobblinYarnTaskRunner-2", new HelixProperty(""), - "GobblinYarnTaskRunner-3", new HelixProperty(""))); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2)); + getJobContext(mockTaskDriver, ImmutableMap.of(3, "GobblinYarnTaskRunner-2"), "job2"); + getJobContext(mockTaskDriver, ImmutableMap.of(4, "GobblinYarnTaskRunner-3"), "job3", ImmutableSet.of(4,5)); + + HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList( + "GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2","GobblinYarnTaskRunner-3")); YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, @@ -223,70 +150,37 @@ public void testTwoWorkflows() throws IOException { runnable.run(); // 5 containers requested and 3 workers in use - ArgumentCaptor argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class); - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 5); + assertContainerRequest(mockYarnService, 5, + ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")); } /** - * Test for two workflows with one not in progress. - * The partitions for the workflow that is not in progress should not be counted. + * Test for three workflows with one not in progress and one marked for delete. + * The partitions for the workflow that is not in progress or is marked for delete should not be counted. */ @Test - public void testNotInProgress() throws IOException { + public void testNotInProgressOrBeingDeleted() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class); - JobDag mockJobDag1 = mock(JobDag.class); - - Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", "job2")); - Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1); - - WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1); - - JobContext mockJobContext1 = mock(JobContext.class); - Mockito.when(mockJobContext1.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1); - - JobContext mockJobContext2 = mock(JobContext.class); - Mockito.when(mockJobContext2.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(3))); - Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2"); - Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2); - - WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class); - JobDag mockJobDag2 = mock(JobDag.class); - - Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3")); - Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2); - - WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.COMPLETED); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2); - - JobContext mockJobContext3 = mock(JobContext.class); - Mockito.when(mockJobContext3.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5))); - Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3"); - Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3); - - Mockito.when(mockTaskDriver.getWorkflows()) - .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2)); - - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""), - "GobblinYarnTaskRunner-2", new HelixProperty(""))); + WorkflowConfig workflowInProgress = getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job-inProgress-1", "job-inProgress-2"), TaskState.IN_PROGRESS, TargetState.START, "workflowInProgress"); + WorkflowConfig workflowCompleted = getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job-complete-1"), TaskState.COMPLETED, TargetState.STOP, "workflowCompleted"); + WorkflowConfig workflowSetToBeDeleted = getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job-setToDelete-1"), TaskState.IN_PROGRESS, TargetState.DELETE, "workflowSetToBeDeleted"); + Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of( + "workflowInProgress", workflowInProgress, + "workflowCompleted", workflowCompleted, + "workflowSetToBeDeleted", workflowSetToBeDeleted)); + + getJobContext(mockTaskDriver, ImmutableMap.of(1, "GobblinYarnTaskRunner-1"), "job-inProgress-1", + ImmutableSet.of(1,2)); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-2"), "job-inProgress-2"); + getJobContext(mockTaskDriver, ImmutableMap.of(1, "GobblinYarnTaskRunner-3"), "job-setToDelete-1"); + getJobContext(mockTaskDriver, ImmutableMap.of(1, "GobblinYarnTaskRunner-4"), "job-complete-1", + ImmutableSet.of(1, 5)); + + HelixDataAccessor helixDataAccessor = getHelixDataAccessor( + Arrays.asList("GobblinClusterManager", + "GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3", "GobblinYarnTaskRunner-4")); YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, @@ -294,46 +188,26 @@ public void testNotInProgress() throws IOException { runnable.run(); - // 3 containers requested and 2 workers in use - ArgumentCaptor argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class); - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 3); + // 3 containers requested and 4 workers in use + assertContainerRequest(mockYarnService, 3, + ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3", "GobblinYarnTaskRunner-4")); } /** * Test multiple partitions to one container */ @Test - public void testMultiplePartitionsPerContainer() throws IOException { + public void testMultiplePartitionsPerContainer() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class); - JobDag mockJobDag = mock(JobDag.class); - - Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1")); - Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag); + WorkflowConfig mockWorkflowConfig = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); Mockito.when(mockTaskDriver.getWorkflows()) .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - WorkflowContext mockWorkflowContext = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext); - - JobContext mockJobContext = mock(JobContext.class); - Mockito.when(mockJobContext.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext); - - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""))); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-1"), "job1"); + HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1")); YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 2, @@ -342,42 +216,21 @@ public void testMultiplePartitionsPerContainer() throws IOException { runnable.run(); // 1 container requested since 2 partitions and limit is 2 partitions per container. One worker in use. - ArgumentCaptor argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class); - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 1); + assertContainerRequest(mockYarnService, 1, ImmutableSet.of("GobblinYarnTaskRunner-1")); } @Test public void testOverprovision() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class); - JobDag mockJobDag = mock(JobDag.class); - - Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1")); - Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag); - + WorkflowConfig mockWorkflowConfig = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); Mockito.when(mockTaskDriver.getWorkflows()) .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - WorkflowContext mockWorkflowContext = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext); - - JobContext mockJobContext = mock(JobContext.class); - Mockito.when(mockJobContext.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2)); - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""))); + HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1")); YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, @@ -388,13 +241,9 @@ public void testOverprovision() { // 3 containers requested to max and one worker in use // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2 // so targetNumContainers = Ceil((2/1) * 1.2)) = 3. - ArgumentCaptor argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class); - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 3); - + assertContainerRequest(mockYarnService, 3, ImmutableSet.of("GobblinYarnTaskRunner-1")); Mockito.reset(mockYarnService); + YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 0.1, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); @@ -404,10 +253,7 @@ public void testOverprovision() { // 3 containers requested to max and one worker in use // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2 // so targetNumContainers = Ceil((2/1) * 0.1)) = 1. - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 1); + assertContainerRequest(mockYarnService, 1, ImmutableSet.of("GobblinYarnTaskRunner-1")); Mockito.reset(mockYarnService); YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 = @@ -419,44 +265,22 @@ public void testOverprovision() { // 3 containers requested to max and one worker in use // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0, // so targetNumContainers = Ceil((2/1) * 6.0)) = 12. - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 12); + assertContainerRequest(mockYarnService, 12, ImmutableSet.of("GobblinYarnTaskRunner-1")); } /** * Test suppressed exception */ @Test - public void testSuppressedException() throws IOException { + public void testSuppressedException() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class); - JobDag mockJobDag = mock(JobDag.class); + WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); + Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1")); - Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2)); - Mockito.when(mockTaskDriver.getWorkflows()) - .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - - WorkflowContext mockWorkflowContext = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext); - - JobContext mockJobContext = mock(JobContext.class); - Mockito.when(mockJobContext.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext); - - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""))); + HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1")); TestYarnAutoScalingRunnable runnable = new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, helixDataAccessor); @@ -471,14 +295,12 @@ public void testSuppressedException() throws IOException { Mockito.reset(mockYarnService); runnable.setRaiseException(false); runnable.run(); + // 2 container requested - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 2); + assertContainerRequest(mockYarnService, 2, ImmutableSet.of("GobblinYarnTaskRunner-1")); } - public void testMaxValueEvictingQueue() throws Exception { + public void testMaxValueEvictingQueue() { Resource resource = Resource.newInstance(16, 1); YarnAutoScalingManager.SlidingWindowReservoir window = new YarnAutoScalingManager.SlidingWindowReservoir(3, 10); // Normal insertion with eviction of originally largest value @@ -503,36 +325,16 @@ public void testMaxValueEvictingQueue() throws Exception { * candidate for scaling-down. */ @Test - public void testInstanceIdleBeyondTolerance() throws IOException { + public void testInstanceIdleBeyondTolerance() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class); - JobDag mockJobDag = mock(JobDag.class); - Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1")); - Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag); - - Mockito.when(mockTaskDriver.getWorkflows()) - .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); - - WorkflowContext mockWorkflowContext = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext); + WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); + Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); // Having both partition assigned to single instance initially, in this case, GobblinYarnTaskRunner-2 - JobContext mockJobContext = mock(JobContext.class); - Mockito.when(mockJobContext.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext.getAssignedParticipant(1)).thenReturn("GobblinYarnTaskRunner-2"); - Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-2"); + getJobContext(mockTaskDriver, ImmutableMap.of(1,"GobblinYarnTaskRunner-2", 2, "GobblinYarnTaskRunner-2"), "job1"); - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext); - - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""), - "GobblinYarnTaskRunner-2", new HelixProperty(""))); + HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2")); TestYarnAutoScalingRunnable runnable = new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, helixDataAccessor); @@ -541,21 +343,14 @@ public void testInstanceIdleBeyondTolerance() throws IOException { // 2 containers requested and one worker in use, while the evaluation will hold for true if not set externally, // still tell YarnService there are two instances being used. - ArgumentCaptor argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class); - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 2); + assertContainerRequest(mockYarnService, 2, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2")); // Set failEvaluation which simulates the "beyond tolerance" case. Mockito.reset(mockYarnService); runnable.setAlwaysTagUnused(true); runnable.run(); - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-2"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 2); + assertContainerRequest(mockYarnService, 2, ImmutableSet.of("GobblinYarnTaskRunner-2")); } @Test @@ -563,63 +358,29 @@ public void testFlowsWithHelixTags() { YarnService mockYarnService = mock(YarnService.class); TaskDriver mockTaskDriver = mock(TaskDriver.class); - WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class); - JobDag mockJobDag1 = mock(JobDag.class); - - Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", "job2")); - Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1); - - WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); - - Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1); - - JobContext mockJobContext1 = mock(JobContext.class); - Mockito.when(mockJobContext1.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); - Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); - Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1); - - JobContext mockJobContext2 = mock(JobContext.class); - Mockito.when(mockJobContext2.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(3))); - Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2"); - Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2); - - WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class); - JobDag mockJobDag2 = mock(JobDag.class); - - Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3")); - Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2); - - WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class); - Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); + WorkflowConfig mockWorkflowConfig1 = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"), TaskState.IN_PROGRESS, TargetState.START, "workflow1"); + WorkflowConfig mockWorkflowConfig2 = + getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job3"), TaskState.IN_PROGRESS, TargetState.START, "workflow2"); + Mockito.when(mockTaskDriver.getWorkflows()) + .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2)); - Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2); + getJobContext(mockTaskDriver, ImmutableMap.of(2, "GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2)); + getJobContext(mockTaskDriver, ImmutableMap.of(3, "GobblinYarnTaskRunner-2"), "job2"); + getJobContext(mockTaskDriver, ImmutableMap.of(4, "GobblinYarnTaskRunner-3"), "job3", ImmutableSet.of(4, 5)); - JobContext mockJobContext3 = mock(JobContext.class); - Mockito.when(mockJobContext3.getPartitionSet()) - .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5))); - Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3"); - Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3); JobConfig mockJobConfig3 = mock(JobConfig.class); + Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3); String helixTag = "test-Tag1"; - Map resourceMap = new HashMap<>(); - resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, "512"); - resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, "8"); + Map resourceMap = ImmutableMap.of( + GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, "512", + GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, "8" + ); Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag); Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap); - Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3); - Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3); - Mockito.when(mockTaskDriver.getWorkflows()) - .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2)); - HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); - Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); - Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) - .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""), - "GobblinYarnTaskRunner-2", new HelixProperty(""), - "GobblinYarnTaskRunner-3", new HelixProperty(""))); + HelixDataAccessor helixDataAccessor = getHelixDataAccessor( + Arrays.asList("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")); YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, @@ -629,22 +390,76 @@ public void testFlowsWithHelixTags() { // 5 containers requested and 3 workers in use ArgumentCaptor argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class); - Mockito.verify(mockYarnService, times(1)). - requestTargetNumberOfContainers(argument.capture(), - eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3"))); - Assert.assertEquals(argument.getValue().getTotalContainers(), 5); + assertContainerRequest(argument, mockYarnService, 5, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")); + + // Verify that 3 containers requested with default tag and resource setting, + // while 2 with specific helix tag and resource requirement Map> resourceHelixTagMap = argument.getValue().getResourceHelixTagMap(); Map helixTagResourceMap = argument.getValue().getHelixTagResourceMap(); Map helixTagContainerCountMap = argument.getValue().getHelixTagContainerCountMap(); - // Verify that 3 containers requested with default tag and resource setting, - // while 2 with specific helix tag and resource requirement Assert.assertEquals(resourceHelixTagMap.size(), 2); Assert.assertEquals(helixTagResourceMap.get(helixTag), Resource.newInstance(512, 8)); Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag), Resource.newInstance(defaultContainerMemory, defaultContainerCores)); Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2); Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag), 3); + } + + private HelixDataAccessor getHelixDataAccessor(List taskRunners) { + HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); + Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); + + Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())).thenReturn( + taskRunners.stream().collect(Collectors.toMap((name) -> name, (name) -> new HelixProperty("")))); + return helixDataAccessor; + } + + private WorkflowConfig getWorkflowConfig(TaskDriver mockTaskDriver, ImmutableSet jobNames, + TaskState taskState, TargetState targetState, String workflowName) { + WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class); + JobDag mockJobDag1 = mock(JobDag.class); + + Mockito.when(mockJobDag1.getAllNodes()).thenReturn(jobNames); + Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1); + Mockito.when(mockWorkflowConfig1.getTargetState()).thenReturn(targetState); + + WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class); + Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(taskState); + + Mockito.when(mockTaskDriver.getWorkflowContext(workflowName)).thenReturn(mockWorkflowContext1); + return mockWorkflowConfig1; + } + + private JobContext getJobContext(TaskDriver mockTaskDriver, Map assignedParticipantMap, String jobName) { + return getJobContext(mockTaskDriver, assignedParticipantMap, jobName, assignedParticipantMap.keySet()); + } + + private JobContext getJobContext( + TaskDriver mockTaskDriver, + Map assignedParticipantMap, + String jobName, + Set partitionSet) { + JobContext mockJobContext = mock(JobContext.class); + Mockito.when(mockJobContext.getPartitionSet()).thenReturn(ImmutableSet.copyOf(partitionSet)); + for (Map.Entry entry : assignedParticipantMap.entrySet()) { + Mockito.when(mockJobContext.getAssignedParticipant(entry.getKey())).thenReturn(entry.getValue()); + } + Mockito.when(mockTaskDriver.getJobContext(jobName)).thenReturn(mockJobContext); + return mockJobContext; + } + + private void assertContainerRequest(ArgumentCaptor argument, YarnService mockYarnService, int expectedNumberOfContainers, + ImmutableSet expectedInUseInstances) { + ArgumentCaptor.forClass(YarnContainerRequestBundle.class); + Mockito.verify(mockYarnService, times(1)). + requestTargetNumberOfContainers(argument.capture(), + eq(expectedInUseInstances)); + Assert.assertEquals(argument.getValue().getTotalContainers(), expectedNumberOfContainers); + } + private void assertContainerRequest(YarnService mockYarnService, int expectedNumberOfContainers, + ImmutableSet expectedInUseInstances) { + assertContainerRequest(ArgumentCaptor.forClass(YarnContainerRequestBundle.class), mockYarnService, expectedNumberOfContainers, expectedInUseInstances); } private static class TestYarnAutoScalingRunnable extends YarnAutoScalingManager.YarnAutoScalingRunnable {