Skip to content

Commit

Permalink
respond to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Urmi Mustafi committed Aug 4, 2023
1 parent 5fcb86b commit 41de9f6
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
Expand Down Expand Up @@ -187,7 +187,7 @@ public void configure(Binder binder) {
binder.bind(RequesterService.class)
.to(NoopRequesterService.class);

binder.bind(SharedFlowMetricsContainer.class);
binder.bind(SharedFlowMetricsSingleton.class);

OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
if (serviceConfig.isTopologyCatalogEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,20 @@

package org.apache.gobblin.service.modules.orchestration;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand All @@ -40,26 +52,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
Expand All @@ -83,8 +78,8 @@
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.ExecutionChecksUtil;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
Expand Down Expand Up @@ -216,9 +211,10 @@ public String toString() {
private final UserQuotaManager quotaManager;
private final ClassAliasResolver<SpecCompiler> aliasResolver;
private final SpecCompiler specCompiler;
private final SharedFlowMetricsContainer sharedFlowMetricsContainer;
private final boolean flowConcurrencyFlag;
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
private final boolean isFlowConcurrencyEnabled;
private final FlowCatalog flowCatalog;
private final FlowCompilationValidationHelper flowCompilationValidationHelper;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
Expand All @@ -230,7 +226,7 @@ public String toString() {
private volatile boolean isActive = false;

public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
SharedFlowMetricsContainer sharedFlowMetricsContainer, FlowStatusGenerator flowStatusGenerator,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator,
FlowCatalog flowCatalog, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
Expand All @@ -254,25 +250,18 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
this.jobStatusRetriever = jobStatusRetriever;
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.flowStatusGenerator = flowStatusGenerator;
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
specCompilerClassName = config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
}
log.info("Using specCompiler class name/alias " + specCompilerClassName);

this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)), config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
throw new RuntimeException(e);
}
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
this.specCompiler = GobblinConstructorUtils.invokeConstructor(SpecCompiler.class, ConfigUtils.getString(config,
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS), config);
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
this.quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
this.sharedFlowMetricsContainer = sharedFlowMetricsContainer;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.flowCatalog = flowCatalog;
this.flowCompilationValidationHelper = new FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
quotaManager, eventSubmitter, flowStatusGenerator, isFlowConcurrencyEnabled);
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
}
Expand All @@ -298,9 +287,9 @@ private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {

@Inject
public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
SharedFlowMetricsContainer sharedFlowMetricsContainer, FlowStatusGenerator flowStatusGenerator,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator,
FlowCatalog flowCatalog) {
this(config, jobStatusRetriever, sharedFlowMetricsContainer, flowStatusGenerator, flowCatalog, true);
this(config, jobStatusRetriever, sharedFlowMetricsSingleton, flowStatusGenerator, flowCatalog, true);
}

/** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
Expand Down Expand Up @@ -516,8 +505,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
ExecutionChecksUtil.handleChecksBeforeExecution(sharedFlowMetricsContainer, specCompiler, quotaManager,
eventSubmitter, flowStatusGenerator, log, flowConcurrencyFlag, spec);
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
if (optionalJobExecutionPlanDag.isPresent()) {
addDag(optionalJobExecutionPlanDag.get(), true, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.ExecutionChecksUtil;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
Expand All @@ -86,7 +86,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
protected final MetricContext metricContext;

protected final Optional<EventSubmitter> eventSubmitter;
private final boolean flowConcurrencyFlag;
private final boolean isFlowConcurrencyEnabled;
@Getter
private Optional<Meter> flowOrchestrationSuccessFulMeter;
@Getter
Expand All @@ -97,22 +97,23 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private FlowStatusGenerator flowStatusGenerator;

private UserQuotaManager quotaManager;
private final FlowCompilationValidationHelper flowCompilationValidationHelper;
private Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
private SharedFlowMetricsContainer sharedFlowMetricsContainer;
private SharedFlowMetricsSingleton sharedFlowMetricsSingleton;

private final ClassAliasResolver<SpecCompiler> aliasResolver;

public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsContainer sharedFlowMetricsContainer) {
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerHandler = flowTriggerHandler;
this.sharedFlowMetricsContainer = sharedFlowMetricsContainer;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
Expand Down Expand Up @@ -144,19 +145,21 @@ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Op
this.flowOrchestrationTimer = Optional.absent();
this.eventSubmitter = Optional.absent();
}
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
this.flowCompilationValidationHelper = new FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
quotaManager, eventSubmitter, flowStatusGenerator, isFlowConcurrencyEnabled);
}

@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<Logger> log, Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsContainer sharedFlowMetricsContainer) {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, flowTriggerHandler,
sharedFlowMetricsContainer);
sharedFlowMetricsSingleton);
}


Expand Down Expand Up @@ -230,10 +233,11 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);

sharedFlowMetricsContainer.addFlowGauge(spec, flowConfig, flowName, flowGroup);

if (!ExecutionChecksUtil.isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, quotaManager,
eventSubmitter, flowStatusGenerator, _log, flowConcurrencyFlag, flowConfig, spec, flowName, flowGroup)) {
sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, flowName);
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, spec, flowGroup,
flowName);
if (!jobExecutionPlanDagOptional.isPresent()) {
return;
}
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
Expand All @@ -260,21 +264,19 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
_log.info("Multi-active scheduler finished handling trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
triggerTimestampMillis);
} else {
// Compile flow spec and do corresponding checks
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
Optional<TimingEvent> flowCompilationTimer =
this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));

Dag<JobExecutionPlan> jobExecutionPlanDag = jobExecutionPlanDagOptional.get();
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
ExecutionChecksUtil.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata);
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsContainer.CompiledState.FAILED);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsSingleton.CompiledState.FAILED);
_log.warn("Cannot determine an executor to run on for Spec: " + spec);
return;
}
sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsContainer.CompiledState.SUCCESSFUL);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);

ExecutionChecksUtil.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
if (flowCompilationTimer.isPresent()) {
flowCompilationTimer.get().stop(flowMetadata);
}
Expand Down Expand Up @@ -326,8 +328,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil

public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
ExecutionChecksUtil.handleChecksBeforeExecution(sharedFlowMetricsContainer, specCompiler, quotaManager,
eventSubmitter, flowStatusGenerator, _log, flowConcurrencyFlag, flowSpec);
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
Expand Down Expand Up @@ -172,7 +172,6 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
// TODO: do i need to initialize sharedFlowMetricsContainer here?
super(ConfigUtils.configToProperties(config), schedulerService);

_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -211,11 +210,11 @@ public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusG
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager, SchedulerService schedulerService,
Optional<Logger> log, boolean warmStandbyEnabled, Optional <FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsContainer sharedFlowMetricsContainer)
SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log, flowTriggerHandler,
sharedFlowMetricsContainer),
sharedFlowMetricsSingleton),
schedulerService, quotaManager, log, warmStandbyEnabled, flowTriggerHandler);
}

Expand Down
Loading

0 comments on commit 41de9f6

Please sign in to comment.