Skip to content

Commit

Permalink
[GOBBLIN- 1856] Add flow trigger handler leasing metrics (#3717)
Browse files Browse the repository at this point in the history
* add flow trigger handler leasing metrics

* remove todo and resolve merge conflicts

* address PR comments

---------

Co-authored-by: Meeth Gala <[email protected]>
  • Loading branch information
meethngala and Meeth Gala committed Jul 21, 2023
1 parent bea009e commit 0151890
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class ServiceMetricNames {
public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time";
public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";

// Flow Trigger Handler Lease Status Counts
public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leaseObtained";
public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leasedToAnother";
public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.noLongerLeasing";

//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
Expand Down Expand Up @@ -69,6 +71,12 @@ public class FlowTriggerHandler {
private MetricContext metricContext;
private ContextAwareMeter numFlowsSubmitted;

private ContextAwareCounter leaseObtainedCount;

private ContextAwareCounter leasedToAnotherStatusCount;

private ContextAwareCounter noLongerLeasingStatusCount;

@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
SchedulerService schedulerService, Optional<DagActionStore> dagActionStore) {
Expand All @@ -80,6 +88,9 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
this.getClass());
this.numFlowsSubmitted = metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
this.leaseObtainedCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
this.leasedToAnotherStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
}

/**
Expand All @@ -94,10 +105,12 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimestamp());
return;
}
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
Expand All @@ -107,10 +120,14 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
this.leasedToAnotherStatusCount.inc();
scheduleReminderForEvent(jobProps, (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
this.noLongerLeasingStatusCount.inc();
log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(),
eventTimeMillis);
return;
}
throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",
Expand Down

0 comments on commit 0151890

Please sign in to comment.