-
Notifications
You must be signed in to change notification settings - Fork 744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1868] Introduce FlowCompilationValidationHelper & SharedFlowMetricsSingleton for sharing between Orchestrator & DagManager #3731
Conversation
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java
Outdated
Show resolved
Hide resolved
...service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit on PR title, you're not refactoring common utils, but rather introducing them... maybe "introduce XYZ for sharing between Orch and DM" or "extract XYZ from Orchestrator for sharing w/ DM", etc...
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
...rvice/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java
Outdated
Show resolved
Hide resolved
* Note: We expect further refactoring to be done to the DagManager in later stage of multi-active development so we do | ||
* not attempt major reorganization as abstractions may change. | ||
*/ | ||
public final class ExecutionChecksUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlowCompilationValidationHelper
?
also, I completely agree w/ the statelessness (and final
ity) of this abstraction, but just for comprehensibility I'd suggest, creating a ctor merely to separate "the args that stay the same" from "the changing ones", to put the latter front and center.
e.g. so if not called like this:
this.fcHelper = new FlowCompilationValidationHelper(
sharedFlowMetricsContainer, specCompiler, quotaManager,
optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled);
... // way later on
Optional<Dag<JobExecutionPlan>> optDag = fcHelper.createExecutionPlanIfValid(flowSpec);
then at least like this:
Optional<Dag<JobExecutionPlan>> optDag = new FlowCompilationValidationHelper(
sharedFlowMetricsContainer, specCompiler, quotaManager,
optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled)
.createExecutionPlanIfValid(flowSpec);
p.s. populating the compilation failure message could remain static
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, the parameter list to these functions was getting quite long. I went with the first method above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good work--nice pragmatic solution... and very close now too!
good PR title as well... although the class names there have drifted from those in the latest commit
private SharedFlowMetricsSingleton sharedFlowMetricsSingleton; | ||
private SpecCompiler specCompiler; | ||
private UserQuotaManager quotaManager; | ||
private Optional<EventSubmitter> eventSubmitter; | ||
private FlowStatusGenerator flowStatusGenerator; | ||
private boolean isFlowConcurrencyEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are all of these really mutable, or could any be final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing all to be final and using @DaTa constructor instead.
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); | ||
if (flowCompilationTimer.isPresent()) { | ||
flowCompilationTimer.get().stop(flowMetadata); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this timing still meaningful, given that validateAndHandleConcurrentExecution
was called before even starting it? also, is it OK that it's never stopped in the error case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the start/stop of this to surround the call to validateAndHandleConcurrentExecution
where the compilation is done. In the Orchestrator error case we never call stop so the event is never submitted unless the compilation is successful, but instead mark this flowOrchestrationFailedMeter
so I added those calls to mark that meter when the optional is empty (or other failure cases) - original PR here.
if (!jobExecutionPlanDagOptional.isPresent()) { | ||
return Optional.absent(); | ||
} | ||
|
||
Optional<TimingEvent> flowCompilationTimer = | ||
eventSubmitter.transform(submitter -> new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED)); | ||
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); | ||
|
||
if (jobExecutionPlanDagOptional.get() == null || jobExecutionPlanDagOptional.get().isEmpty()) { | ||
populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); | ||
return Optional.absent(); | ||
} | ||
|
||
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); | ||
if (flowCompilationTimer.isPresent()) { | ||
flowCompilationTimer.get().stop(flowMetadata); | ||
} | ||
return jobExecutionPlanDagOptional; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
granted, the timer makes it a bit tricky (I have thoughts on reworking if you want to discuss.), but this begins to look to me like:
import org.apache.commons.lang3.tuple.Pair;
...
return jobDagOptional
.map(dag -> Pair.of(dag, TimingEventUtils.getFlowMetadata(flowSpec)))
.filter(p -> {
Dag dag = p.getLeft()
if (dag != null && !dag.isEmpty()) {
return true;
} else {
populate(...)
return false;
}
}).map(p -> addFlowExecutionIdIfAbsent(p.getRight(), p.getLeft()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I find this also a bit unintuitive to read especially since we're working with one dag at a time the if/else blocks with the timer are easier to handle in the current state.
...lin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #3731 +/- ##
=========================================
Coverage 47.08% 47.09%
- Complexity 10859 10860 +1
=========================================
Files 2144 2146 +2
Lines 84748 84774 +26
Branches 9410 9409 -1
=========================================
+ Hits 39907 39926 +19
- Misses 41221 41227 +6
- Partials 3620 3621 +1
... and 6 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
…MetricsSingleton for sharing between Orchestrator & DagManager (apache#3731) * refactoring skeleton, builds * deploy works * add javadocs * fix failing test * respond to comments * Clean up in response to next review --------- Co-authored-by: Urmi Mustafi <[email protected]>
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Create a Helper class
FlowCompilationValidationHelper
to contain functionality re-used between theDagManager
andOrchestrator
when launching executions of a flow spec. In the common case, theOrchestrator
receives a flow to orchestrate, performs necessary validations, and forwards the execution responsibility to theDagManager
. TheDagManager's
responsibility is to carry out any flow action requests. However, with launch executions now being stored in theDagActionStateStore
, on restart or leadership change theDagManager
has to perform validations before executing any launch actions the previous leader was unable to complete.We tried to have the
DagManager
refer to theOrchestrator
to avoid duplicating code in a previous commit, but introduced a circular dependency between theDagManager
andOrchestrator
and was unable to work in runtime. This class is utilized to store the common functionality. It is stateful, requiring all stateful pieces to be passed as input from the caller upon instantiating the helper.Note: We expect further refactoring to be done to the DagManager in later stage of multi-active development so we do not attempt major reorganization as abstractions may change.
Also creates a class,
SharedFlowMetricsSingleton
to store shared metrics between theDagManager
andOrchestrator
.Tests
Deploys locally and service starts
Commits