diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index 90379e730f..410350b885 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -104,6 +104,8 @@ public FlowTriggerHandler(Config config, Optional lease public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException { if (multiActiveLeaseArbiter.isPresent()) { + log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction, + eventTimeMillis); MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis); if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 186b6d81c1..c0dc8b2090 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -244,6 +244,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil return; } Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); // If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly // Skip flow compilation as well, since we recompile after receiving event from DagActionStoreChangeMonitor later