Skip to content

Commit

Permalink
[Fix-3153] Alert information cannot be sent in cancelling state (#3157)
Browse files Browse the repository at this point in the history
* fix issue 3153

* optimize alert logic

* fix spotless check
  • Loading branch information
SbloodyS authored Oct 12, 2023
1 parent 762923a commit bd2f754
Showing 1 changed file with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,29 @@ and the status is not obtained this time (flink rest server is closed),

if (flag != null) {
log.info("FlinkAppHttpWatcher previous state: canceling.");
if (stopFromEnum.isNone()) {
log.error(
"FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.CANCELED;
try {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo != null) {
String state = yarnAppInfo.getApp().getFinalStatus();
flinkAppStateEnum = FlinkAppStateEnum.of(state);
}
} finally {
if (stopFromEnum.isNone()) {
log.error(
"FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
if (flinkAppStateEnum == FlinkAppStateEnum.KILLED
|| flinkAppStateEnum == FlinkAppStateEnum.FAILED) {
doAlert(application, flinkAppStateEnum);
}
}
application.setState(flinkAppStateEnum.getValue());
cleanSavepoint(application);
cleanOptioning(optionStateEnum, application.getId());
doPersistMetrics(application, true);
}
application.setState(FlinkAppStateEnum.CANCELED.getValue());
cleanSavepoint(application);
cleanOptioning(optionStateEnum, application.getId());
doPersistMetrics(application, true);

} else {
// query the status from the yarn rest Api
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
Expand Down

0 comments on commit bd2f754

Please sign in to comment.