Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN- 1856] Add flow trigger handler leasing metrics #3717

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class ServiceMetricNames {
public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time";
public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";

// Flow Trigger Handler Lease Status Counts
public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leaseObtained";
public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leasedToAnother";
public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.noLongerLeasing";

//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
Expand Down Expand Up @@ -69,6 +71,12 @@ public class FlowTriggerHandler {
private MetricContext metricContext;
private ContextAwareMeter numFlowsSubmitted;

private ContextAwareCounter leaseObtainedCount;

private ContextAwareCounter leasedToAnotherStatusCount;

private ContextAwareCounter noLongerLeasingStatusCount;

@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
SchedulerService schedulerService, Optional<DagActionStore> dagActionStore) {
Expand All @@ -80,6 +88,9 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
this.getClass());
this.numFlowsSubmitted = metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
this.leaseObtainedCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
this.leasedToAnotherStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
}

/**
Expand All @@ -94,10 +105,12 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimestamp());
return;
}
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
Expand All @@ -107,10 +120,14 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
this.leasedToAnotherStatusCount.inc();
scheduleReminderForEvent(jobProps, (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
this.noLongerLeasingStatusCount.inc();
log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(),
eventTimeMillis);
return;
}
throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",
Expand Down
Loading