diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index 9ad9d68a5a5..e941c947df8 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -77,6 +77,7 @@ public static class FlowTimings { public static final String FLOW_RUN_DEADLINE_EXCEEDED = "FlowRunDeadlineExceeded"; public static final String FLOW_START_DEADLINE_EXCEEDED = "FlowStartDeadlineExceeded"; public static final String FLOW_PENDING_RESUME = "FlowPendingResume"; + public static final String FLOW_SKIPPED = "FlowSkipped"; } public static class FlowEventConstants { diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java index 293d77fbc98..720ba270dec 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java @@ -836,7 +836,7 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe jobStatusMonitor.shutDown(); } - @Test// (dependsOnMethods = "testObservabilityEventFlowFailed") + @Test public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException { DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class); KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8"); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 33c3be33045..adfe5157d46 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -43,7 +43,6 @@ import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.service.monitoring.JobStatus; import org.apache.gobblin.service.monitoring.JobStatusRetriever; import org.apache.gobblin.util.ConfigUtils; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 42c7357b0a4..04361225b69 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -157,7 +157,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId); } - public static void cancelDagNode(Dag.DagNode dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException { + public static void cancelDagNode(Dag.DagNode dagNodeToCancel) throws IOException { Properties cancelJobArgs = new Properties(); String serializedFuture = null; @@ -185,22 +185,31 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, /** * Emits JOB_SKIPPED GTE for each of the dependent job. */ - public static void sendSkippedEventForDependentJobs(Dag dag, Dag.DagNode node, - DagManagementStateStore dagManagementStateStore) throws IOException { - for (Dag.DagNode child : dag.getChildren(node)) { - child.getValue().setExecutionStatus(SKIPPED); - dagManagementStateStore.updateDagNode(child); - Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), child.getValue()); + public static void sendSkippedEventForDependentJobs(Dag dag, Dag.DagNode node) { + Set> dependentJobs = new HashSet<>(); + findDependentJobs(dag, node, dependentJobs); + for (Dag.DagNode dependentJob : dependentJobs) { + Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue()); DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata); } } - public static void cancelDag(Dag dag, DagManagementStateStore dagManagementStateStore) throws IOException { + private static void findDependentJobs(Dag dag, + Dag.DagNode node, Set> dependentJobs) { + for (Dag.DagNode child : dag.getChildren(node)) { + if (!dependentJobs.contains(child)) { + dependentJobs.add(child); + findDependentJobs(dag, child, dependentJobs); + } + } + } + + public static void cancelDag(Dag dag) throws IOException { List> dagNodesToCancel = dag.getNodes(); log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag)); for (Dag.DagNode dagNodeToCancel : dagNodesToCancel) { - DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNodeToCancel); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java index 0d4a21058ac..348a3ae4599 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java @@ -57,7 +57,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore, log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), getDagId()); for (Dag.DagNode dagNodeToCancel : dagNodesToCancel) { - DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNodeToCancel); } dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java index 065e43c2d6b..4696e8c55ec 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java @@ -78,7 +78,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore, log.info("Job exceeded the job start deadline. Killing it now. Job - {}, jobOrchestratedTime - {}, timeOutForJobStart - {}", DagUtils.getJobName(dagNode), jobOrchestratedTime, timeOutForJobStart); dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode); - DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNode); dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED); dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration"); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java index 64ddb0dbb70..3c1a09bad96 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java @@ -69,13 +69,13 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft(); if (dagNodeToCancel.isPresent()) { - DagProcUtils.cancelDagNode(dagNodeToCancel.get(), dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNodeToCancel.get()); } else { dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId()); } } else { - DagProcUtils.cancelDag(dag.get(), dagManagementStateStore); + DagProcUtils.cancelDag(dag.get()); } dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java index 7c5a0d47b7e..dabfa55aaf6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java @@ -80,7 +80,7 @@ protected Optional> initialize(DagManagementStateStore dag protected void act(DagManagementStateStore dagManagementStateStore, Optional> dag, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { if (!dag.isPresent()) { - log.warn("Dag with id " + getDagId() + " could not be compiled."); + log.warn("Dag with id " + getDagId() + " could not be compiled or cannot run concurrently."); dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); } else { DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index 77c7469236d..ccc63e62f44 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -60,8 +60,11 @@ protected Pair>, Optional> ini protected void act(DagManagementStateStore dagManagementStateStore, Pair>, Optional> dagNodeWithJobStatus, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { if (!dagNodeWithJobStatus.getLeft().isPresent()) { - // one of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process - // has cleaned up the Dag, yet did not complete the lease before this current one acquired its own + // One of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process + // has cleaned up the Dag, yet did not complete the lease before this current one acquired its own. + // Another reason could be that LaunchDagProc was unable to compile the FlowSpec or the flow cannot run concurrently. + // In these cases FLOW_FAILED and FLOW_SKIPPED events are emitted respectively, which are terminal status and + // create a ReevaluateDagProc. But in these cases Dag was never created or never saved. log.error("DagNode or its job status not found for a Reevaluate DagAction with dag node id {}", this.dagNodeId); dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); return; @@ -99,6 +102,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair> validateAndHandleConcurrentExecution(Conf quotaManager.releaseQuota(dagNode); } } - // Send FLOW_FAILED event - flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent " + // Send FLOW_SKIPPED event + flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow is skipped because another instance is running and concurrent " + "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour."); - new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); + new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_SKIPPED).stop(flowMetadata); return Optional.absent(); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java index d6d18df30a6..fc64459016d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java @@ -141,6 +141,7 @@ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEven case TimingEvent.LauncherTimings.JOB_PENDING: properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING.name()); break; + case TimingEvent.FlowTimings.FLOW_SKIPPED: case TimingEvent.LauncherTimings.JOB_SKIPPED: properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.SKIPPED.name()); break; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java index 8e94489636d..3091b4cc5e9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java @@ -115,8 +115,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer ORDERED_EXECUTION_STATUSES = ImmutableList - .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.SKIPPED, ExecutionStatus.PENDING_RESUME, - ExecutionStatus.PENDING_RETRY, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE, + .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY, + ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED); private final JobIssueEventHandler jobIssueEventHandler; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java index 8b673277b2b..36b9ba17cc9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.net.URISyntaxException; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -225,20 +224,5 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc .submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap()); Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows)) .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); - - Assert.assertEquals(dag.getNodes().get(0).getValue().getExecutionStatus(), ExecutionStatus.ORCHESTRATED); // because this was a mocked dag and we launched this job - Assert.assertEquals(dag.getNodes().get(1).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job - Assert.assertEquals(dag.getNodes().get(2).getValue().getExecutionStatus(), ExecutionStatus.CANCELLED); // because we cancelled this job - Assert.assertEquals(dag.getNodes().get(3).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job - Assert.assertEquals(dag.getNodes().get(4).getValue().getExecutionStatus(), ExecutionStatus.SKIPPED); // because its parent job was cancelled - - Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId)); - - dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); - dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); - dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); - - doReturn(new HashSet<>(dag.getNodes())).when(dagManagementStateStore).getDagNodes(any()); - Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId)); } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java index fda632b299c..5fc4cb6cec6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java @@ -49,9 +49,10 @@ import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagActionStore; import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerTest; +import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine; import org.apache.gobblin.service.modules.orchestration.DagTestUtils; import org.apache.gobblin.service.modules.orchestration.DagUtils; -import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine; import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest; import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask; @@ -321,17 +322,16 @@ public void testRetryCurrentFailedJob() throws Exception { @Test public void testCancelledJob() throws Exception { String flowName = "fn5"; - Dag dag = DagManagerTest.buildDag("1", flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), + Dag dag = DagTestUtils.buildDag("1", flowExecutionId, DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 3, "user5", ConfigFactory.empty() .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); - DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + dagManagementStateStore.addDag(dag); List> specProducers = getDagSpecProducers(dag); - dagManagementStateStore.addDag(dag); JobStatus jobStatus1 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) .jobName("job0").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.CANCELLED.name()) .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); @@ -341,6 +341,7 @@ public void testCancelledJob() throws Exception { JobStatus jobStatus3 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) .jobName("job2").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), Optional.of(jobStatus1))) .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); @@ -352,15 +353,25 @@ public void testCancelledJob() throws Exception { // job cancelled, so no more jobs to launch specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); - // dag should still have the running jobs that are skipped - Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId)); + // dag is considered finish because the remaining jobs depend on this cancelled job + // it should have been marked `failed` Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); - Mockito.verify(dagManagementStateStore, Mockito.never()).markDagFailed(any()); - // and also remove the ENFORCE_FLOW_FINISH_DEADLINE dag action - Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDagAction(any()); - // JOB_SKIPPED is emitted twice, once for each of the child job + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + + // and ENFORCE_FLOW_FINISH_DEADLINE dag action should have got deleted + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + + // total of three events should be emitted. two JOB_SKIPPED events, once for each of the child job and one flow_cancelled event Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)) .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)).submit(any(), anyMap()); + + // this finished dag should not create any new dag actions + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(1)), Optional.of(jobStatus2))) .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); @@ -369,16 +380,19 @@ public void testCancelledJob() throws Exception { dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); reEvaluateDagProc2.process(dagManagementStateStore, mockedDagProcEngineMetrics); - // dag should still have the running jobs that are skipped - Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId)); + // reEvaluateDagProc2 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); - Mockito.verify(dagManagementStateStore, Mockito.never()).markDagFailed(any()); - // and also remove the ENFORCE_FLOW_FINISH_DEADLINE dag action - Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDagAction(any()); - // JOB_SKIPPED is emitted twice, once for each of the child job + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)) .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); - + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus3))) .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); @@ -387,61 +401,139 @@ public void testCancelledJob() throws Exception { dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); reEvaluateDagProc3.process(dagManagementStateStore, mockedDagProcEngineMetrics); - // this final relaunch dag proc should mark the dag failed + // reEvaluateDagProc3 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); - // and also remove the ENFORCE_FLOW_FINISH_DEADLINE dag action + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); - - // this finished dag should not create any new dag actions + Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)).submit(any(), anyMap()); Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); - Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)).submit(any(), anyMap()); } @Test public void testFailedJob() throws Exception { - String flowName = "fn5"; - Dag dag = DagManagerTest.buildDag("1", flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), - 3, "user5", ConfigFactory.empty() + String flowName = "fn6"; + Dag dag = DagManagerTest.buildDag("1", flowExecutionId, DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), + 4, "user5", ConfigFactory.empty() .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); - DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + dagManagementStateStore.addDag(dag); List> specProducers = getDagSpecProducers(dag); - dagManagementStateStore.addDag(dag); JobStatus jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) .jobName("job0").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.FAILED.name()) .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus2 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job1").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus3 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job2").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus4 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job3").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), Optional.of(jobStatus))) .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); - ReevaluateDagProc reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + ReevaluateDagProc reEvaluateDagProc1 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( flowGroup, flowName, flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); - reEvaluateDagProc.process(dagManagementStateStore, mockedDagProcEngineMetrics); + reEvaluateDagProc1.process(dagManagementStateStore, mockedDagProcEngineMetrics); // job cancelled, so no more jobs to launch specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); - // JOB_SKIPPED is emitted twice, once for each of the child job - Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)) + // dag should be considered finish because the remaining jobs depend on this cancelled job + // it should have been marked `failed` + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + + // and ENFORCE_FLOW_FINISH_DEADLINE dag action should have got deleted + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + + // total of four events should be emitted. three JOB_SKIPPED events, once for each of the descendent jobs and one flow_failed event + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); + + // this finished dag should not create any new dag actions + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(1)), Optional.of(jobStatus2))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc2 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job1", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc2.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // reEvaluateDagProc2 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); - // dag should not have any running jobs now - Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId)); + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus3))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc3 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job2", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc3.process(dagManagementStateStore, mockedDagProcEngineMetrics); - // job cancellation should mark the dag failed + // reEvaluateDagProc3 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(3)), Optional.of(jobStatus4))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc4 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job2", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc4.process(dagManagementStateStore, mockedDagProcEngineMetrics); - // and also remove the ENFORCE_FLOW_FINISH_DEADLINE dag action + // reEvaluateDagProc4 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); - // this finished dag should not create any new dag actions + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); }