Skip to content

Commit

Permalink
[GOBBLIN-1868] Introduce FlowCompilationValidationHelper & SharedFlow…
Browse files Browse the repository at this point in the history
…MetricsSingleton for sharing between Orchestrator & DagManager (#3731)

* refactoring skeleton, builds

* deploy works

* add javadocs

* fix failing test

* respond to comments

* Clean up in response to next review

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi committed Aug 7, 2023
1 parent 2917b63 commit 01036cc
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
Expand Down Expand Up @@ -186,6 +187,8 @@ public void configure(Binder binder) {
binder.bind(RequesterService.class)
.to(NoopRequesterService.class);

binder.bind(SharedFlowMetricsSingleton.class);

OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
if (serviceConfig.isTopologyCatalogEnabled()) {
binder.bind(TopologyCatalog.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@

package org.apache.gobblin.service.modules.orchestration;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -39,25 +52,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
Expand All @@ -77,9 +74,12 @@
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
Expand Down Expand Up @@ -206,8 +206,12 @@ public String toString() {
protected final Long defaultJobStartSlaTimeMillis;
@Getter
private final JobStatusRetriever jobStatusRetriever;
private final Orchestrator orchestrator;
private final FlowStatusGenerator flowStatusGenerator;
private final UserQuotaManager quotaManager;
private final SpecCompiler specCompiler;
private final boolean isFlowConcurrencyEnabled;
private final FlowCatalog flowCatalog;
private final FlowCompilationValidationHelper flowCompilationValidationHelper;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
Expand All @@ -218,7 +222,8 @@ public String toString() {

private volatile boolean isActive = false;

public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Orchestrator orchestrator,
public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator,
FlowCatalog flowCatalog, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
Expand All @@ -240,8 +245,18 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
this.orchestrator = orchestrator;
this.flowStatusGenerator = flowStatusGenerator;
this.specCompiler = GobblinConstructorUtils.invokeConstructor(SpecCompiler.class, ConfigUtils.getString(config,
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS), config);
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
this.quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
this.flowCatalog = flowCatalog;
this.flowCompilationValidationHelper = new FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
quotaManager, eventSubmitter, flowStatusGenerator, isFlowConcurrencyEnabled);
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
}
Expand All @@ -266,9 +281,10 @@ private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
}

@Inject
public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Orchestrator orchestrator,
public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator,
FlowCatalog flowCatalog) {
this(config, jobStatusRetriever, orchestrator, flowCatalog, true);
this(config, jobStatusRetriever, sharedFlowMetricsSingleton, flowStatusGenerator, flowCatalog, true);
}

/** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
Expand Down Expand Up @@ -483,7 +499,8 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag = orchestrator.handleChecksBeforeExecution(spec);
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
if (optionalJobExecutionPlanDag.isPresent()) {
addDag(optionalJobExecutionPlanDag.get(), true, true);
}
Expand Down
Loading

0 comments on commit 01036cc

Please sign in to comment.