From bd2f754c4c0f80d03741a71761cb2159a0080ebf Mon Sep 17 00:00:00 2001 From: xiangzihao Date: Wed, 11 Oct 2023 21:32:24 -0500 Subject: [PATCH] [Fix-3153] Alert information cannot be sent in cancelling state (#3157) * fix issue 3153 * optimize alert logic * fix spotless check --- .../core/watcher/FlinkAppHttpWatcher.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 16bf1c0aec..5bea3a1656 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -307,15 +307,29 @@ and the status is not obtained this time (flink rest server is closed), if (flag != null) { log.info("FlinkAppHttpWatcher previous state: canceling."); - if (stopFromEnum.isNone()) { - log.error( - "FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!"); - savePointService.expire(application.getId()); + FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.CANCELED; + try { + YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); + if (yarnAppInfo != null) { + String state = yarnAppInfo.getApp().getFinalStatus(); + flinkAppStateEnum = FlinkAppStateEnum.of(state); + } + } finally { + if (stopFromEnum.isNone()) { + log.error( + "FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!"); + savePointService.expire(application.getId()); + if (flinkAppStateEnum == FlinkAppStateEnum.KILLED + || flinkAppStateEnum == FlinkAppStateEnum.FAILED) { + doAlert(application, flinkAppStateEnum); + } + } + application.setState(flinkAppStateEnum.getValue()); + cleanSavepoint(application); + cleanOptioning(optionStateEnum, application.getId()); + doPersistMetrics(application, true); } - application.setState(FlinkAppStateEnum.CANCELED.getValue()); - cleanSavepoint(application); - cleanOptioning(optionStateEnum, application.getId()); - doPersistMetrics(application, true); + } else { // query the status from the yarn rest Api YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);