Skip to content

Commit

Permalink
add a new status SKIPPED for skipped jobs and flows
Browse files Browse the repository at this point in the history
fix merge conflicts
  • Loading branch information
arjun4084346 committed Sep 8, 2024
1 parent 45ad13e commit 96b36c4
Show file tree
Hide file tree
Showing 19 changed files with 246 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ private void runJobExecutionLauncher() throws JobException {

try {
if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
// todo it should emit SKIPPED_JOB event that sets the job status SKIPPED rather than CANCELLED
TimingEvent timer = new TimingEvent(eventSubmitter, TimingEvent.JOB_SKIPPED_TIME);
HashMap<String, String> metadata = new HashMap<>(Tag.toMap(Tag.tagValuesToString(
HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static class LauncherTimings {
public static final String JOB_PENDING_RESUME = "JobPendingResume";
public static final String JOB_ORCHESTRATED = "JobOrchestrated";
public static final String JOB_PREPARE = "JobPrepareTimer";
public static final String JOB_SKIPPED = "JobSkipped";
public static final String JOB_START = "JobStartTimer";
public static final String JOB_RUN = "JobRunTimer";
public static final String JOB_COMMIT = "JobCommitTimer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOpe
ImmutableList.of(
createFlowCompiledEvent(),
createJobOrchestratedEvent(1, 2),
createJobSkippedEvent()
createJobSkippedTimeEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
Expand Down Expand Up @@ -836,6 +836,40 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe
jobStatusMonitor.shutDown();
}

@Test// (dependsOnMethods = "testObservabilityEventFlowFailed")
public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException {
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8");

//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createJobSkippedEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});

try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);

State state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.SKIPPED.name());
Mockito.verify(dagManagementStateStore, Mockito.times(1)).addJobDagAction(
any(), any(), anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));

jobStatusMonitor.shutDown();
}

private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator,
String jobGroup, String jobName) throws IOException {
jobStatusMonitor.processMessage(recordIterator.next());
Expand Down Expand Up @@ -871,11 +905,15 @@ private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt, int
return createGTE(TimingEvent.LauncherTimings.JOB_ORCHESTRATED, metadata);
}

private GobblinTrackingEvent createJobSkippedEvent() {
return createGTE(TimingEvent.LauncherTimings.JOB_SKIPPED, Maps.newHashMap());
}

private GobblinTrackingEvent createJobStartEvent() {
return createGTE(TimingEvent.LauncherTimings.JOB_START, Maps.newHashMap());
}

private GobblinTrackingEvent createJobSkippedEvent() {
private GobblinTrackingEvent createJobSkippedTimeEvent() {
return createGTE(TimingEvent.JOB_SKIPPED_TIME, Maps.newHashMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ enum ExecutionStatus {
* Flow cancelled.
*/
CANCELLED

/**
* Flow or job is skipped
*/
SKIPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@Slf4j
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES = Lists.newArrayList(ExecutionStatus.FAILED.name(),
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.SKIPPED.name());
public static final int MAX_LOOKBACK = 100;

private final JobStatusRetriever jobStatusRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public interface DagManagementStateStore {
* {@link DagManagementStateStore#addDag}. This call is just an additional identifier which may be used
* for DagNode level operations. In the future, it may be merged with checkpointDag.
* @param dagNode dag node to be added
* @param dagId dag id of the dag this dag node belongs to
*/
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) {
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME ||
executionStatus == SKIPPED) {
//Add a node to be executed next, only if all of its parent nodes are COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand Down Expand Up @@ -183,6 +182,19 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
}

/**
* Emits JOB_SKIPPED GTE for each of the dependent job.
*/
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node,
DagManagementStateStore dagManagementStateStore) throws IOException {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
child.getValue().setExecutionStatus(SKIPPED);
dagManagementStateStore.updateDagNode(child);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), child.getValue());
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag));
Expand All @@ -202,7 +214,7 @@ private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNo
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of the provided
* flow event type.
*/
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String flowEvent) {
if (!dag.isEmpty()) {
// Every dag node will contain the same flow metadata
Config config = DagUtils.getDagJobConfig(dag);
Expand All @@ -213,7 +225,7 @@ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExe
flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
}

eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
DagProc.eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
}

dag.get().setMessage("Flow killed by request");
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);

if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId());
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagUtils.getFlowId(dag.get()),
Dag.FlowState.RUNNING);
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, getDagTask().getDagAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
} else if (DagProcUtils.isDagFinished(dag)) {
String flowEvent = DagProcUtils.calcFlowStatus(dag);
dag.setFlowEvent(flowEvent);
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
DagProcUtils.setAndEmitFlowEvent(dag, flowEvent);
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
// todo - verify if work from PR#3641 is required
dagManagementStateStore.deleteDag(getDagId());
Expand Down Expand Up @@ -159,9 +159,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
break;
case CANCELLED:
case SKIPPED:
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
long flowResumeTime = System.currentTimeMillis();

// Set the flow and its failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
DagProcUtils.setAndEmitFlowEvent(failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);

for (Dag.DagNode<JobExecutionPlan> node : failedDag.get().getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEven
case TimingEvent.LauncherTimings.JOB_PENDING:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING.name());
break;
case TimingEvent.LauncherTimings.JOB_SKIPPED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.SKIPPED.name());
break;
case TimingEvent.FlowTimings.FLOW_PENDING_RESUME:
case TimingEvent.LauncherTimings.JOB_PENDING_RESUME:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RESUME.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));

private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.SKIPPED, ExecutionStatus.PENDING_RESUME,
ExecutionStatus.PENDING_RETRY, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);

private final JobIssueEventHandler jobIssueEventHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -186,7 +186,8 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
message("Test message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();

doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
// third node (job2) will be queried for by the KillDagProc because we are killing that node
doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());

LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new DagActionStore.DagAction("fg", "flow2",
Expand Down Expand Up @@ -215,8 +216,6 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
.getInvocations()
.stream()
.filter(a -> a.getMethod().getName().equals("cancelJob"))
.filter(a -> ((Properties) a.getArgument(1))
.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE).equals(MockedSpecExecutor.dummySerializedFuture))
.count())
.sum();
// kill dag proc tries to cancel only the exact dag node that was provided
Expand All @@ -226,5 +225,20 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
.submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows))
.submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());

Assert.assertEquals(dag.getNodes().get(0).getValue().getExecutionStatus(), ExecutionStatus.ORCHESTRATED); // because this was a mocked dag and we launched this job
Assert.assertEquals(dag.getNodes().get(1).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(2).getValue().getExecutionStatus(), ExecutionStatus.CANCELLED); // because we cancelled this job
Assert.assertEquals(dag.getNodes().get(3).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(4).getValue().getExecutionStatus(), ExecutionStatus.SKIPPED); // because its parent job was cancelled

Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));

dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);

doReturn(new HashSet<>(dag.getNodes())).when(dagManagementStateStore).getDagNodes(any());
Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
}
}
Loading

0 comments on commit 96b36c4

Please sign in to comment.