Skip to content

Commit

Permalink
[Improve] job state improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
benjobs committed Nov 11, 2023
1 parent 6c48cb0 commit 1e71f37
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ private void watch(Long key, Application application) {
STOP_FROM_MAP.getOrDefault(key, null) == null
? StopFrom.NONE
: STOP_FROM_MAP.get(key);
final OptionState optionState = OPTIONING.get(key);
try {
// query status from flink rest api
getFromFlinkRestApi(application, stopFrom);
Expand All @@ -219,36 +220,37 @@ private void watch(Long key, Application application) {
Query from flink's restAPI and yarn's restAPI both failed.
In this case, it is necessary to decide whether to return to the final state depending on the state being operated
*/
// non-mapping
if (application.getState() != FlinkAppState.MAPPING.getValue()) {
log.error(
"FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
alertService.alert(application, FlinkAppState.LOST);
} else {
application.setState(FlinkAppState.CANCELED.getValue());
if (optionState == null || !optionState.equals(OptionState.STARTING)) {
// non-mapping
if (application.getState() != FlinkAppState.MAPPING.getValue()) {
log.error(
"FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
alertService.alert(application, FlinkAppState.LOST);
} else {
application.setState(FlinkAppState.CANCELED.getValue());
}
}
}
/*
This step means that the above two ways to get information have failed, and this step is the last step,
which will directly identify the mission as cancelled or lost.
Need clean savepoint.
*/
application.setEndTime(new Date());
cleanSavepoint(application);
OptionState optionState = OPTIONING.get(key);
cleanOptioning(optionState, key);
doPersistMetrics(application, true);
FlinkAppState appState = FlinkAppState.of(application.getState());
if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) {
alertService.alert(application, FlinkAppState.of(application.getState()));
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationService.start(application, true);
} catch (Exception e) {
log.error(e.getMessage(), e);
/*
This step means that the above two ways to get information have failed, and this step is the last step,
which will directly identify the mission as cancelled or lost.
Need clean savepoint.
*/
application.setEndTime(new Date());
cleanSavepoint(application);
cleanOptioning(optionState, key);
doPersistMetrics(application, true);
FlinkAppState appState = FlinkAppState.of(application.getState());
if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) {
alertService.alert(application, FlinkAppState.of(application.getState()));
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationService.start(application, true);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,23 @@ trait FlinkClientTrait extends Logger {
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {

val packageProgram = PackagedProgram.newBuilder
val pgkBuilder = PackagedProgram.newBuilder
.setJarFile(jarFile)
.setUserClassPaths(
Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
)
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
.setArguments(flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*)
.build()
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*)
// userClassPath...
submitRequest.executionMode match {
case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB =>
pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs)
case _ =>
}

val packageProgram = pgkBuilder.build()

val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
Expand Down

0 comments on commit 1e71f37

Please sign in to comment.