Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Meeth Gala committed Jul 19, 2023
1 parent 852e001 commit e27cdd1
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
Expand Down Expand Up @@ -70,6 +71,12 @@ public class FlowTriggerHandler {
private MetricContext metricContext;
private ContextAwareMeter numFlowsSubmitted;

private ContextAwareCounter leaseObtainedCount;

private ContextAwareCounter leasedToAnotherStatusCount;

private ContextAwareCounter noLongerLeasingStatusCount;

@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
SchedulerService schedulerService, Optional<DagActionStore> dagActionStore) {
Expand All @@ -81,6 +88,9 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
this.getClass());
this.numFlowsSubmitted = metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
this.leaseObtainedCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
this.leasedToAnotherStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
}

/**
Expand All @@ -97,7 +107,7 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimestamp());
Expand All @@ -110,12 +120,14 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
this.leasedToAnotherStatusCount.inc();
scheduleReminderForEvent(jobProps, (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
this.noLongerLeasingStatusCount.inc();
log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(),
eventTimeMillis);
return;
}
throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",
Expand Down

0 comments on commit e27cdd1

Please sign in to comment.