From 1e71f37645e243f74c4a08ba25935ef54546967d Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 11 Nov 2023 16:16:01 +0800 Subject: [PATCH] [Improve] job state improvement --- .../core/task/FlinkRESTAPIWatcher.java | 60 ++++++++++--------- .../flink/client/trait/FlinkClientTrait.scala | 21 ++++--- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index a114517649..fab29a74e0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -207,6 +207,7 @@ private void watch(Long key, Application application) { STOP_FROM_MAP.getOrDefault(key, null) == null ? StopFrom.NONE : STOP_FROM_MAP.get(key); + final OptionState optionState = OPTIONING.get(key); try { // query status from flink rest api getFromFlinkRestApi(application, stopFrom); @@ -219,36 +220,37 @@ private void watch(Long key, Application application) { Query from flink's restAPI and yarn's restAPI both failed. In this case, it is necessary to decide whether to return to the final state depending on the state being operated */ - // non-mapping - if (application.getState() != FlinkAppState.MAPPING.getValue()) { - log.error( - "FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); - if (StopFrom.NONE.equals(stopFrom)) { - savePointService.expire(application.getId()); - application.setState(FlinkAppState.LOST.getValue()); - alertService.alert(application, FlinkAppState.LOST); - } else { - application.setState(FlinkAppState.CANCELED.getValue()); + if (optionState == null || !optionState.equals(OptionState.STARTING)) { + // non-mapping + if (application.getState() != FlinkAppState.MAPPING.getValue()) { + log.error( + "FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); + if (StopFrom.NONE.equals(stopFrom)) { + savePointService.expire(application.getId()); + application.setState(FlinkAppState.LOST.getValue()); + alertService.alert(application, FlinkAppState.LOST); + } else { + application.setState(FlinkAppState.CANCELED.getValue()); + } } - } - /* - This step means that the above two ways to get information have failed, and this step is the last step, - which will directly identify the mission as cancelled or lost. - Need clean savepoint. - */ - application.setEndTime(new Date()); - cleanSavepoint(application); - OptionState optionState = OPTIONING.get(key); - cleanOptioning(optionState, key); - doPersistMetrics(application, true); - FlinkAppState appState = FlinkAppState.of(application.getState()); - if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { - alertService.alert(application, FlinkAppState.of(application.getState())); - if (appState.equals(FlinkAppState.FAILED)) { - try { - applicationService.start(application, true); - } catch (Exception e) { - log.error(e.getMessage(), e); + /* + This step means that the above two ways to get information have failed, and this step is the last step, + which will directly identify the mission as cancelled or lost. + Need clean savepoint. + */ + application.setEndTime(new Date()); + cleanSavepoint(application); + cleanOptioning(optionState, key); + doPersistMetrics(application, true); + FlinkAppState appState = FlinkAppState.of(application.getState()); + if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { + alertService.alert(application, FlinkAppState.of(application.getState())); + if (appState.equals(FlinkAppState.FAILED)) { + try { + applicationService.start(application, true); + } catch (Exception e) { + log.error(e.getMessage(), e); + } } } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index fd26c710c7..6a2e8c95c1 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -233,18 +233,23 @@ trait FlinkClientTrait extends Logger { submitRequest: SubmitRequest, jarFile: File): (PackagedProgram, JobGraph) = { - val packageProgram = PackagedProgram.newBuilder + val pgkBuilder = PackagedProgram.newBuilder .setJarFile(jarFile) - .setUserClassPaths( - Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*) - ) .setEntryPointClassName( flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()) .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - .setArguments(flinkConfig - .getOptional(ApplicationConfiguration.APPLICATION_ARGS) - .orElse(Lists.newArrayList()): _*) - .build() + .setArguments( + flinkConfig + .getOptional(ApplicationConfiguration.APPLICATION_ARGS) + .orElse(Lists.newArrayList()): _*) + // userClassPath... + submitRequest.executionMode match { + case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB => + pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs) + case _ => + } + + val packageProgram = pgkBuilder.build() val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram,