Skip to content

Commit

Permalink
add flow trigger handler leasing metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Meeth Gala committed Jul 18, 2023
1 parent 2c2ffac commit d4a160e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class ServiceMetricNames {
public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time";
public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";

// Flow Trigger Handler Lease Status Counts
public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leaseObtained";
public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leasedToAnother";
public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.noLongerLeasing";

//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
Expand Down Expand Up @@ -96,7 +97,10 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
// TODO: add a log event or metric for each of these cases
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimestamp());
return;
}
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
Expand All @@ -105,10 +109,12 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
leaseObtainedStatus.getEventTimestamp(), 0L), eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
scheduleReminderForEvent(jobProps, (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
return;
}
throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",
Expand Down

0 comments on commit d4a160e

Please sign in to comment.