Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Sep 9, 2024
1 parent 96b36c4 commit 43d53e5
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static class FlowTimings {
public static final String FLOW_RUN_DEADLINE_EXCEEDED = "FlowRunDeadlineExceeded";
public static final String FLOW_START_DEADLINE_EXCEEDED = "FlowStartDeadlineExceeded";
public static final String FLOW_PENDING_RESUME = "FlowPendingResume";
public static final String FLOW_SKIPPED = "FlowSkipped";
}

public static class FlowEventConstants {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe
jobStatusMonitor.shutDown();
}

@Test// (dependsOnMethods = "testObservabilityEventFlowFailed")
@Test
public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException {
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId);
}

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException {
Properties cancelJobArgs = new Properties();
String serializedFuture = null;

Expand Down Expand Up @@ -185,22 +185,31 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
/**
* Emits JOB_SKIPPED GTE for each of the dependent job.
*/
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node,
DagManagementStateStore dagManagementStateStore) throws IOException {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
child.getValue().setExecutionStatus(SKIPPED);
dagManagementStateStore.updateDagNode(child);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), child.getValue());
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) {
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
findDependentJobs(dag, node, dependentJobs);
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
private static void findDependentJobs(Dag<JobExecutionPlan> dag,
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
if (!dependentJobs.contains(child)) {
dependentJobs.add(child);
findDependentJobs(dag, child, dependentJobs);
}
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException {
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag));

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), getDagId());

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}

dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Job exceeded the job start deadline. Killing it now. Job - {}, jobOrchestratedTime - {}, timeOutForJobStart - {}",
DagUtils.getJobName(dagNode), jobOrchestratedTime, timeOutForJobStart);
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNode);
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
if (dagNodeToCancel.isPresent()) {
DagProcUtils.cancelDagNode(dagNodeToCancel.get(), dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel.get());
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId());
}
} else {
DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
DagProcUtils.cancelDag(dag.get());
}
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dag
protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag,
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
if (!dag.isPresent()) {
log.warn("Dag with id " + getDagId() + " could not be compiled.");
log.warn("Dag with id " + getDagId() + " could not be compiled or cannot run concurrently.");
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> ini
protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<JobStatus>> dagNodeWithJobStatus, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
if (!dagNodeWithJobStatus.getLeft().isPresent()) {
// one of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own
// One of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own.
// Another reason could be that LaunchDagProc was unable to compile the FlowSpec or the flow cannot run concurrently.
// In these cases FLOW_FAILED and FLOW_SKIPPED events are emitted respectively, which are terminal status and
// create a ReevaluateDagProc. But in these cases Dag was never created or never saved.
log.error("DagNode or its job status not found for a Reevaluate DagAction with dag node id {}", this.dagNodeId);
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
return;
Expand Down Expand Up @@ -99,6 +102,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried.
// This can also happen when a job is cancelled/failed and dag is cleaned; but we are still processing Reevaluate
// dag actions for SKIPPED dependent jobs
log.warn("Dag not found {}", getDagId());
return;
}
Expand Down Expand Up @@ -159,12 +164,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case CANCELLED:
case SKIPPED:
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
quotaManager.releaseQuota(dagNode);
}
}
// Send FLOW_FAILED event
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
// Send FLOW_SKIPPED event
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow is skipped because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_SKIPPED).stop(flowMetadata);
return Optional.absent();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEven
case TimingEvent.LauncherTimings.JOB_PENDING:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING.name());
break;
case TimingEvent.FlowTimings.FLOW_SKIPPED:
case TimingEvent.LauncherTimings.JOB_SKIPPED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.SKIPPED.name());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));

private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.SKIPPED, ExecutionStatus.PENDING_RESUME,
ExecutionStatus.PENDING_RETRY, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE,
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);

private final JobIssueEventHandler jobIssueEventHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -225,20 +224,5 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
.submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows))
.submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());

Assert.assertEquals(dag.getNodes().get(0).getValue().getExecutionStatus(), ExecutionStatus.ORCHESTRATED); // because this was a mocked dag and we launched this job
Assert.assertEquals(dag.getNodes().get(1).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(2).getValue().getExecutionStatus(), ExecutionStatus.CANCELLED); // because we cancelled this job
Assert.assertEquals(dag.getNodes().get(3).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(4).getValue().getExecutionStatus(), ExecutionStatus.SKIPPED); // because its parent job was cancelled

Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));

dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);

doReturn(new HashSet<>(dag.getNodes())).when(dagManagementStateStore).getDagNodes(any());
Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
}
}
Loading

0 comments on commit 43d53e5

Please sign in to comment.