From 41de9f6f96768d29f8100a03519aeb8b2e3100b3 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 4 Aug 2023 15:52:50 -0700 Subject: [PATCH] respond to comments --- .../core/GobblinServiceGuiceModule.java | 4 +- .../modules/orchestration/DagManager.java | 70 +++++++-------- .../modules/orchestration/Orchestrator.java | 45 +++++----- .../scheduler/GobblinServiceJobScheduler.java | 7 +- ...a => FlowCompilationValidationHelper.java} | 87 +++++++++++-------- ...r.java => SharedFlowMetricsSingleton.java} | 63 +++++++------- .../orchestration/DagManagerFlowTest.java | 4 +- .../orchestration/OrchestratorTest.java | 35 ++++---- 8 files changed, 156 insertions(+), 159 deletions(-) rename gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/{ExecutionChecksUtil.java => FlowCompilationValidationHelper.java} (73%) rename gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/{SharedFlowMetricsContainer.java => SharedFlowMetricsSingleton.java} (78%) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java index 55c3894f7a7..af0c3461adf 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java @@ -27,7 +27,7 @@ import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby; import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory; import org.apache.gobblin.service.monitoring.GitConfigMonitor; @@ -187,7 +187,7 @@ public void configure(Binder binder) { binder.bind(RequesterService.class) .to(NoopRequesterService.class); - binder.bind(SharedFlowMetricsContainer.class); + binder.bind(SharedFlowMetricsSingleton.class); OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class); if (serviceConfig.isTopologyCatalogEnabled()) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 8ed8da403a7..c59b8992380 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -17,8 +17,20 @@ package org.apache.gobblin.service.modules.orchestration; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; +import com.typesafe.config.ConfigFactory; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -40,26 +52,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.eventbus.Subscribe; -import com.google.common.util.concurrent.AbstractIdleService; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigException; -import com.typesafe.config.ConfigFactory; - import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - -import org.apache.commons.lang3.reflect.ConstructorUtils; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; @@ -83,8 +78,8 @@ import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.modules.utils.ExecutionChecksUtil; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer; +import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.service.monitoring.JobStatus; import org.apache.gobblin.service.monitoring.JobStatusRetriever; @@ -216,9 +211,10 @@ public String toString() { private final UserQuotaManager quotaManager; private final ClassAliasResolver aliasResolver; private final SpecCompiler specCompiler; - private final SharedFlowMetricsContainer sharedFlowMetricsContainer; - private final boolean flowConcurrencyFlag; + private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton; + private final boolean isFlowConcurrencyEnabled; private final FlowCatalog flowCatalog; + private final FlowCompilationValidationHelper flowCompilationValidationHelper; private final Config config; private final Optional eventSubmitter; private final long failedDagRetentionTime; @@ -230,7 +226,7 @@ public String toString() { private volatile boolean isActive = false; public DagManager(Config config, JobStatusRetriever jobStatusRetriever, - SharedFlowMetricsContainer sharedFlowMetricsContainer, FlowStatusGenerator flowStatusGenerator, + SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator, FlowCatalog flowCatalog, boolean instrumentationEnabled) { this.config = config; this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS); @@ -254,25 +250,18 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, this.jobStatusRetriever = jobStatusRetriever; this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class); this.flowStatusGenerator = flowStatusGenerator; - try { - String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS; - if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) { - specCompilerClassName = config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY); - } - log.info("Using specCompiler class name/alias " + specCompilerClassName); - - this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)), config); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | - ClassNotFoundException e) { - throw new RuntimeException(e); - } - this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, + this.specCompiler = GobblinConstructorUtils.invokeConstructor(SpecCompiler.class, ConfigUtils.getString(config, + ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, + ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS), config); + this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED); this.quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config); - this.sharedFlowMetricsContainer = sharedFlowMetricsContainer; + this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton; this.flowCatalog = flowCatalog; + this.flowCompilationValidationHelper = new FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler, + quotaManager, eventSubmitter, flowStatusGenerator, isFlowConcurrencyEnabled); TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT)); this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME)); } @@ -298,9 +287,9 @@ private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) { @Inject public DagManager(Config config, JobStatusRetriever jobStatusRetriever, - SharedFlowMetricsContainer sharedFlowMetricsContainer, FlowStatusGenerator flowStatusGenerator, + SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator, FlowCatalog flowCatalog) { - this(config, jobStatusRetriever, sharedFlowMetricsContainer, flowStatusGenerator, flowCatalog, true); + this(config, jobStatusRetriever, sharedFlowMetricsSingleton, flowStatusGenerator, flowCatalog, true); } /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done @@ -516,8 +505,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction action) { URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); spec = (FlowSpec) flowCatalog.getSpecs(flowUri); Optional> optionalJobExecutionPlanDag = - ExecutionChecksUtil.handleChecksBeforeExecution(sharedFlowMetricsContainer, specCompiler, quotaManager, - eventSubmitter, flowStatusGenerator, log, flowConcurrencyFlag, spec); + this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec); if (optionalJobExecutionPlanDag.isPresent()) { addDag(optionalJobExecutionPlanDag.get(), true, true); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 583f024fd17..2f77b1752d7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -59,8 +59,8 @@ import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.modules.utils.ExecutionChecksUtil; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer; +import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; @@ -86,7 +86,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final MetricContext metricContext; protected final Optional eventSubmitter; - private final boolean flowConcurrencyFlag; + private final boolean isFlowConcurrencyEnabled; @Getter private Optional flowOrchestrationSuccessFulMeter; @Getter @@ -97,22 +97,23 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { private FlowStatusGenerator flowStatusGenerator; private UserQuotaManager quotaManager; + private final FlowCompilationValidationHelper flowCompilationValidationHelper; private Optional flowTriggerHandler; @Getter - private SharedFlowMetricsContainer sharedFlowMetricsContainer; + private SharedFlowMetricsSingleton sharedFlowMetricsSingleton; private final ClassAliasResolver aliasResolver; public Orchestrator(Config config, Optional topologyCatalog, Optional dagManager, Optional log, FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled, - Optional flowTriggerHandler, SharedFlowMetricsContainer sharedFlowMetricsContainer) { + Optional flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton) { _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class); this.topologyCatalog = topologyCatalog; this.dagManager = dagManager; this.flowStatusGenerator = flowStatusGenerator; this.flowTriggerHandler = flowTriggerHandler; - this.sharedFlowMetricsContainer = sharedFlowMetricsContainer; + this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton; try { String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS; if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) { @@ -144,19 +145,21 @@ public Orchestrator(Config config, Optional topologyCatalog, Op this.flowOrchestrationTimer = Optional.absent(); this.eventSubmitter = Optional.absent(); } - this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, + this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED); quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config); + this.flowCompilationValidationHelper = new FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler, + quotaManager, eventSubmitter, flowStatusGenerator, isFlowConcurrencyEnabled); } @Inject public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional topologyCatalog, Optional dagManager, Optional log, Optional flowTriggerHandler, - SharedFlowMetricsContainer sharedFlowMetricsContainer) { + SharedFlowMetricsSingleton sharedFlowMetricsSingleton) { this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, flowTriggerHandler, - sharedFlowMetricsContainer); + sharedFlowMetricsSingleton); } @@ -230,10 +233,11 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); - sharedFlowMetricsContainer.addFlowGauge(spec, flowConfig, flowName, flowGroup); - - if (!ExecutionChecksUtil.isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, quotaManager, - eventSubmitter, flowStatusGenerator, _log, flowConcurrencyFlag, flowConfig, spec, flowName, flowGroup)) { + sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, flowName); + Optional> jobExecutionPlanDagOptional = + this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, spec, flowGroup, + flowName); + if (!jobExecutionPlanDagOptional.isPresent()) { return; } Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); @@ -260,21 +264,19 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil _log.info("Multi-active scheduler finished handling trigger event: [{}, triggerEventTimestamp: {}]", flowAction, triggerTimestampMillis); } else { - // Compile flow spec and do corresponding checks - Dag jobExecutionPlanDag = specCompiler.compileFlow(spec); Optional flowCompilationTimer = this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED)); - + Dag jobExecutionPlanDag = jobExecutionPlanDagOptional.get(); if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) { - ExecutionChecksUtil.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata); + FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata); Instrumented.markMeter(this.flowOrchestrationFailedMeter); - sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsContainer.CompiledState.FAILED); + sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsSingleton.CompiledState.FAILED); _log.warn("Cannot determine an executor to run on for Spec: " + spec); return; } - sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsContainer.CompiledState.SUCCESSFUL); + sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL); - ExecutionChecksUtil.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag); if (flowCompilationTimer.isPresent()) { flowCompilationTimer.get().stop(flowMetadata); } @@ -326,8 +328,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, InterruptedException { Optional> optionalJobExecutionPlanDag = - ExecutionChecksUtil.handleChecksBeforeExecution(sharedFlowMetricsContainer, specCompiler, quotaManager, - eventSubmitter, flowStatusGenerator, _log, flowConcurrencyFlag, flowSpec); + this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); if (optionalJobExecutionPlanDag.isPresent()) { submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get()); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index d929e75f02d..f71093f5286 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -31,7 +31,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.helix.HelixManager; import org.quartz.CronExpression; import org.quartz.DisallowConcurrentExecution; @@ -172,7 +172,6 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser Orchestrator orchestrator, SchedulerService schedulerService, Optional quotaManager, Optional log, @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled, Optional flowTriggerHandler) throws Exception { - // TODO: do i need to initialize sharedFlowMetricsContainer here? super(ConfigUtils.configToProperties(config), schedulerService); _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); @@ -211,11 +210,11 @@ public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusG Optional helixManager, Optional flowCatalog, Optional topologyCatalog, Optional dagManager, Optional quotaManager, SchedulerService schedulerService, Optional log, boolean warmStandbyEnabled, Optional flowTriggerHandler, - SharedFlowMetricsContainer sharedFlowMetricsContainer) + SharedFlowMetricsSingleton sharedFlowMetricsSingleton) throws Exception { this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log, flowTriggerHandler, - sharedFlowMetricsContainer), + sharedFlowMetricsSingleton), schedulerService, quotaManager, log, warmStandbyEnabled, flowTriggerHandler); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java similarity index 73% rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java rename to gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index d1904a34e23..d6ebeb3a1d2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -21,6 +21,8 @@ import com.typesafe.config.Config; import java.io.IOException; import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.event.EventSubmitter; @@ -34,22 +36,29 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.util.ConfigUtils; -import org.slf4j.Logger; /** - * Stateless class with functionality meant to be re-used between the DagManager and Orchestrator when launching + * Helper class with functionality meant to be re-used between the DagManager and Orchestrator when launching * executions of a flow spec. In the common case, the Orchestrator receives a flow to orchestrate, performs necessary * validations, and forwards the execution responsibility to the DagManager. The DagManager's responsibility is to * carry out any flow action requests. However, with launch executions now being stored in the DagActionStateStore, on * restart or leadership change the DagManager has to perform validations before executing any launch actions the * previous leader was unable to complete. Rather than duplicating the code or introducing a circular dependency between - * the DagManager and Orchestrator, this class is utilized to store the common functionality. It is stateless and - * requires all stateful pieces to be passed as input from the caller. - * Note: We expect further refactoring to be done to the DagManager in later stage of multi-active development so we do + * the DagManager and Orchestrator, this class is utilized to store the common functionality. It is stateful, + * requiring all stateful pieces to be passed as input from the caller upon instantiating the helper. + * Note: We expect further refactoring to be done to the DagManager in later stage of multi-active development, so we do * not attempt major reorganization as abstractions may change. */ -public final class ExecutionChecksUtil { +@Slf4j +@AllArgsConstructor +public final class FlowCompilationValidationHelper { + private SharedFlowMetricsSingleton sharedFlowMetricsSingleton; + private SpecCompiler specCompiler; + private UserQuotaManager quotaManager; + private Optional eventSubmitter; + private FlowStatusGenerator flowStatusGenerator; + private boolean isFlowConcurrencyEnabled; /** * For a given a flowSpec, verifies that an execution is allowed (in case there is an ongoing execution) and the @@ -57,60 +66,59 @@ public final class ExecutionChecksUtil { * caller. * @return jobExecutionPlan dag if one can be constructed for the given flowSpec */ - public static Optional> handleChecksBeforeExecution( - SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler specCompiler, UserQuotaManager quotaManager, - Optional eventSubmitter, FlowStatusGenerator flowStatusGenerator, Logger log, - boolean flowConcurrencyFlag, FlowSpec flowSpec) + public Optional> createExecutionPlanIfValid(FlowSpec flowSpec) throws IOException, InterruptedException { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); - if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, quotaManager, eventSubmitter, - flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, flowName, flowGroup)) { - return Optional.absent(); - } //Wait for the SpecCompiler to become healthy. specCompiler.awaitHealthy(); - Dag jobExecutionPlanDag = specCompiler.compileFlow(flowSpec); + Optional> jobExecutionPlanDagOptional = + validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName); + if (!jobExecutionPlanDagOptional.isPresent()) { + return Optional.absent(); + } + Optional flowCompilationTimer = eventSubmitter.transform(submitter -> new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED)); Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); - if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) { + if (jobExecutionPlanDagOptional.get() == null || jobExecutionPlanDagOptional.get().isEmpty()) { populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); return Optional.absent(); } - addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag); + addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); if (flowCompilationTimer.isPresent()) { flowCompilationTimer.get().stop(flowMetadata); } - return Optional.of(jobExecutionPlanDag); + return jobExecutionPlanDagOptional; } /** * Checks if flowSpec disallows concurrent executions, and if so then checks if another instance of the flow is * already running and emits a FLOW FAILED event. Otherwise, this check passes. - * @return true if caller can proceed to execute flow, false otherwise + * @return Optional> if caller allowed to execute flow and compile spec, else absent Optional * @throws IOException */ - public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer sharedFlowMetricsContainer, - SpecCompiler specCompiler, UserQuotaManager quotaManager, Optional eventSubmitter, - FlowStatusGenerator flowStatusGenerator, Logger log, boolean flowConcurrencyFlag, Config flowConfig, Spec spec, - String flowName, String flowGroup) throws IOException { + public Optional> validateAndHandleConcurrentExecution(Config flowConfig, Spec spec, + String flowGroup, String flowName) throws IOException { boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, - ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, flowConcurrencyFlag); + ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled); Dag jobExecutionPlanDag = specCompiler.compileFlow(spec); - if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) { + if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) { + return Optional.of(jobExecutionPlanDag); + } else { log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since " + "concurrent executions are disabled for this flow.", flowGroup, flowName); - sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsContainer.CompiledState.SKIPPED); - Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter()); - if (!((FlowSpec) spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { + sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, + SharedFlowMetricsSingleton.CompiledState.SKIPPED); + Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter()); + if (!isScheduledFlow((FlowSpec) spec)) { // For ad-hoc flow, we might already increase quota, we need to decrease here for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) { quotaManager.releaseQuota(dagNode); @@ -124,9 +132,8 @@ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer sha if (eventSubmitter.isPresent()) { new TimingEvent(eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); } - return false; + return Optional.absent(); } - return true; } /** @@ -137,13 +144,9 @@ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer sha * @param allowConcurrentExecution * @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING. */ - private static boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, - String flowGroup, boolean allowConcurrentExecution) { - if (allowConcurrentExecution) { - return true; - } else { - return !flowStatusGenerator.isFlowRunning(flowName, flowGroup); - } + private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, + boolean allowConcurrentExecution) { + return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup); } /** @@ -177,9 +180,17 @@ public static void populateFlowCompilationFailedEventMessage(Optional flowMetadata, Dag jobExecutionPlanDag) { + public static void addFlowExecutionIdIfAbsent(Map flowMetadata, + Dag jobExecutionPlanDag) { flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty( ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); } + + /** + * Return true if the spec contains a schedule, false otherwise. + */ + public static boolean isScheduledFlow(FlowSpec spec) { + return spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY); + } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java similarity index 78% rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java rename to gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java index 0bdc282e340..ab9bb5f97f1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java @@ -22,6 +22,7 @@ import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import java.net.URI; import java.util.Map; import javax.inject.Inject; import javax.inject.Singleton; @@ -44,29 +45,49 @@ */ @Singleton @Data -public class SharedFlowMetricsContainer { +public class SharedFlowMetricsSingleton { protected final MetricContext metricContext; - private Map flowGauges = Maps.newHashMap(); + private Map flowGaugeStateBySpecUri = Maps.newHashMap(); private Optional skippedFlowsMeter; + @Setter + public static class FlowCompiledState { + private CompiledState state = CompiledState.UNKNOWN; + } + + public enum CompiledState { + FAILED(-1), + UNKNOWN(0), + SUCCESSFUL(1), + SKIPPED(2); + + public int value; + + CompiledState(int value) { + this.value = value; + } + } + @Inject - public SharedFlowMetricsContainer(Config config) { - this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), SharedFlowMetricsContainer.class); + public SharedFlowMetricsSingleton(Config config) { + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), + SharedFlowMetricsSingleton.class); this.skippedFlowsMeter = Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS)); } /** * Adds a new FlowGauge to the metric context if one does not already exist for this flow spec */ - public void addFlowGauge(Spec spec, Config flowConfig, String flowName, String flowGroup) { + public void addFlowGauge(Spec spec, Config flowConfig, String flowGroup, String flowName) { // Only register the metric of flows that are scheduled, run once flows should not be tracked indefinitely - if (!flowGauges.containsKey(spec.getUri().toString()) && flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) { - String flowCompiledGaugeName = - MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED); - flowGauges.put(spec.getUri().toString(), new FlowCompiledState()); + if (!flowGaugeStateBySpecUri.containsKey(spec.getUri()) + && flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) { + String flowCompiledGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, flowName, + ServiceMetricNames.COMPILED); + flowGaugeStateBySpecUri.put(spec.getUri(), new FlowCompiledState()); ContextAwareGauge gauge = RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, - () -> flowGauges.get(spec.getUri().toString()).state.value); + () -> flowGaugeStateBySpecUri.get(spec.getUri()).state.value); RootMetricContext.get().register(flowCompiledGaugeName, gauge); } } @@ -76,26 +97,8 @@ public void addFlowGauge(Spec spec, Config flowConfig, String flowName, String f * @param state desired state to set the gauge */ public void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState state) { - if (flowGauges.containsKey(spec.getUri().toString())) { - flowGauges.get(spec.getUri().toString()).setState(state); - } - } - - @Setter - public static class FlowCompiledState { - private CompiledState state = CompiledState.UNKNOWN; - } - - public enum CompiledState { - FAILED(-1), - UNKNOWN(0), - SUCCESSFUL(1), - SKIPPED(2); - - public int value; - - CompiledState(int value) { - this.value = value; + if (flowGaugeStateBySpecUri.containsKey(spec.getUri())) { + flowGaugeStateBySpecUri.get(spec.getUri()).setState(state); } } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java index d694e656c38..5445a209670 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.mockito.Mockito; import org.testng.Assert; @@ -344,7 +344,7 @@ public boolean apply(@Nullable Void input) { class MockedDagManager extends DagManager { public MockedDagManager(Config config, boolean instrumentationEnabled) { - super(config, createJobStatusRetriever(), Mockito.mock(SharedFlowMetricsContainer.class), + super(config, createJobStatusRetriever(), Mockito.mock(SharedFlowMetricsSingleton.class), Mockito.mock(FlowStatusGenerator.class), Mockito.mock(FlowCatalog.class), instrumentationEnabled); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index f12489b75bb..2d3f4b3f917 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -18,36 +18,23 @@ package org.apache.gobblin.service.modules.orchestration; import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Optional; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.typesafe.config.Config; import java.io.File; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.List; import java.util.Properties; - import org.apache.commons.io.FileUtils; - import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.ServiceMetricNames; -import org.apache.gobblin.runtime.api.SpecCatalogListener; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsContainer; -import org.apache.hadoop.fs.Path; -import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import com.google.common.base.Optional; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.typesafe.config.Config; - import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecCatalogListener; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; @@ -55,9 +42,17 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; import static org.mockito.Mockito.*; @@ -113,7 +108,7 @@ public void setup() throws Exception { this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class); this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), this.mockStatusGenerator, Optional.of(this.topologyCatalog), Optional.absent(), Optional.of(logger), - Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsContainer( + Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsSingleton( ConfigUtils.propertiesToConfig(orchestratorProperties))); this.topologyCatalog.addListener(orchestrator); this.flowCatalog.addListener(orchestrator); @@ -335,7 +330,7 @@ public void deleteFlowSpec() throws Exception { @Test (dependsOnMethods = "deleteFlowSpec") public void doNotRegisterMetricsAdhocFlows() throws Exception { - MetricContext metricContext = this.orchestrator.getSharedFlowMetricsContainer().getMetricContext(); //.getMetricContext(); + MetricContext metricContext = this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext(); this.topologyCatalog.getInitComplete().countDown(); // unblock orchestration Properties flowProps = new Properties(); flowProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "flow0");