diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 94e2a82c6c0..9e9ab1e69b2 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -34,6 +34,11 @@ public class ServiceMetricNames { public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time"; public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay"; + // Flow Trigger Handler Lease Status Counts + public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leaseObtained"; + public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leasedToAnother"; + public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.noLongerLeasing"; + //Job status poll timer public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index ec63276c4c5..7fdee038414 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -38,8 +38,10 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter; import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter; @@ -69,6 +71,12 @@ public class FlowTriggerHandler { private MetricContext metricContext; private ContextAwareMeter numFlowsSubmitted; + private ContextAwareCounter leaseObtainedCount; + + private ContextAwareCounter leasedToAnotherStatusCount; + + private ContextAwareCounter noLongerLeasingStatusCount; + @Inject public FlowTriggerHandler(Config config, Optional leaseDeterminationStore, SchedulerService schedulerService, Optional dagActionStore) { @@ -80,6 +88,9 @@ public FlowTriggerHandler(Config config, Optional lease this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)), this.getClass()); this.numFlowsSubmitted = metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED); + this.leaseObtainedCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT); + this.leasedToAnotherStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT); + this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT); } /** @@ -94,10 +105,12 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo throws IOException { if (multiActiveLeaseArbiter.isPresent()) { MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis); - // TODO: add a log event or metric for each of these cases if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; + this.leaseObtainedCount.inc(); if (persistFlowAction(leaseObtainedStatus)) { + log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(), + leaseObtainedStatus.getEventTimestamp()); return; } // If persisting the flow action failed, then we set another trigger for this event to occur immediately to @@ -107,10 +120,14 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo eventTimeMillis); return; } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) { + this.leasedToAnotherStatusCount.inc(); scheduleReminderForEvent(jobProps, (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, eventTimeMillis); return; } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) { + this.noLongerLeasingStatusCount.inc(); + log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(), + eventTimeMillis); return; } throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",