From a0541f1373b33d880fdbfe1cabbd3f4349600012 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sun, 29 Sep 2024 19:49:52 +0800 Subject: [PATCH] [Improve] spark-job support proxy (#4101) * [Bug] spark-job update bug fixed. * [Improve] spark-app run state minor improvement * [Improve] spark-job detail page style improvements * [Improve] spark-job i18n improvements * [Improve] spark-app cancel bug fixed * [Improve] spark-job build pipeline improvements * [Improve] code style improvements * [Improve] spark build-request bug fixed. * [Improve] spark-app proxy support. * [Improve] check tracking minor improvements --- .../streampark/common/enums/FlinkJobType.java | 6 +- .../streampark/common/enums/SparkJobType.java | 6 +- .../console/core/bean/AppControl.java | 2 + ...java => FlinkBuildPipelineController.java} | 2 +- .../core/controller/ProxyController.java | 44 ++++---- .../SparkApplicationController.java | 14 +-- ...java => SparkBuildPipelineController.java} | 2 +- .../core/controller/SparkProxyController.java | 101 ------------------ .../console/core/entity/FlinkApplication.java | 16 +-- .../core/entity/FlinkApplicationBackUp.java | 4 +- .../console/core/entity/SparkApplication.java | 16 +-- .../core/entity/SparkApplicationBackUp.java | 4 +- .../core/entity/SparkApplicationLog.java | 51 --------- .../core/enums/DistributedTaskEnum.java | 7 +- .../core/enums/SparkOperationEnum.java | 2 +- .../mapper/SparkApplicationLogMapper.java | 25 ----- .../console/core/service/ProxyService.java | 6 +- .../SparkApplicationActionService.java | 2 +- .../SparkApplicationLogService.java | 44 -------- ...java => FlinkAppBuildPipeServiceImpl.java} | 28 ++--- .../FlinkApplicationActionServiceImpl.java | 29 ++--- .../impl/FlinkApplicationInfoServiceImpl.java | 6 +- .../FlinkApplicationManageServiceImpl.java | 21 ++-- .../impl/SparkAppBuildPipeServiceImpl.java | 39 +++---- .../SparkApplicationActionServiceImpl.java | 41 +++---- .../SparkApplicationConfigServiceImpl.java | 9 +- .../impl/SparkApplicationLogServiceImpl.java | 57 ---------- .../SparkApplicationManageServiceImpl.java | 27 +++-- .../impl/DistributedTaskServiceImpl.java | 4 +- .../impl/FlinkSavepointServiceImpl.java | 8 +- .../core/service/impl/ProxyServiceImpl.java | 36 ++++--- .../core/utils/AlertTemplateUtils.java | 6 +- .../core/watcher/FlinkAppHttpWatcher.java | 14 +-- .../core/watcher/SparkAppHttpWatcher.java | 14 +-- .../src/main/resources/db/schema-h2.sql | 20 +--- .../mapper/core/SparkApplicationMapper.xml | 12 +-- .../src/api/spark/app.type.ts | 1 + .../src/locales/lang/en/flink/app.ts | 2 +- .../src/locales/lang/en/spark/app.ts | 2 +- .../src/locales/lang/en/spark/home.ts | 2 +- .../src/locales/lang/zh-CN/menu.ts | 6 +- .../src/locales/lang/zh-CN/spark/app.ts | 44 ++++---- .../src/views/flink/app/View.vue | 1 - .../app/components/AppDetail/DetailTab.vue | 8 +- .../src/views/flink/app/data/detail.data.ts | 4 +- .../src/views/flink/app/hooks/useApp.tsx | 18 ++-- .../src/views/flink/cluster/View.vue | 4 +- .../views/spark/app/components/DetailTab.vue | 14 +-- .../src/views/spark/app/components/State.tsx | 2 +- .../src/views/spark/app/data/detail.data.ts | 4 +- .../src/views/spark/app/detail.vue | 4 +- .../views/spark/app/hooks/useSparkAction.tsx | 18 ++-- .../spark/app/hooks/useSparkTableAction.ts | 5 +- .../src/views/spark/app/index.vue | 7 -- .../e2e/cases/FlinkSQL116OnYarnTest.java | 6 +- .../e2e/cases/FlinkSQL117OnYarnTest.java | 6 +- .../e2e/cases/FlinkSQL118OnYarnTest.java | 6 +- .../flink/applications/ApplicationForm.java | 38 +++---- .../flink/client/bean/SubmitRequest.scala | 4 +- .../client/impl/YarnApplicationClient.scala | 4 +- .../flink/client/trait/FlinkClientTrait.scala | 10 +- .../packer/pipeline/PipelineTypeEnum.java | 4 +- .../flink/packer/pipeline/BuildRequest.scala | 22 ++-- .../impl/FlinkRemoteBuildPipeline.scala | 6 +- .../FlinkYarnApplicationBuildPipeline.scala | 4 +- ...ine.scala => SparkYarnBuildPipeline.scala} | 21 ++-- .../spark/client/bean/SubmitRequest.scala | 28 ++--- .../spark/client/trait/SparkClientTrait.scala | 2 +- 68 files changed, 372 insertions(+), 660 deletions(-) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/{FlinkApplicationBuildPipelineController.java => FlinkBuildPipelineController.java} (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/{SparkApplicationBuildPipelineController.java => SparkBuildPipelineController.java} (98%) delete mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java delete mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java delete mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java delete mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/{AppBuildPipeServiceImpl.java => FlinkAppBuildPipeServiceImpl.java} (98%) delete mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java rename streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/{SparkYarnApplicationBuildPipeline.scala => SparkYarnBuildPipeline.scala} (88%) diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java index b30f498a60..b15d2c0976 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java @@ -52,9 +52,9 @@ public enum FlinkJobType { */ @Nonnull public static FlinkJobType of(@Nullable Integer value) { - for (FlinkJobType flinkDevelopmentMode : values()) { - if (flinkDevelopmentMode.mode.equals(value)) { - return flinkDevelopmentMode; + for (FlinkJobType flinkJobType : values()) { + if (flinkJobType.mode.equals(value)) { + return flinkJobType; } } return FlinkJobType.UNKNOWN; diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkJobType.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkJobType.java index cb6b4ed617..4e5114aff9 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkJobType.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkJobType.java @@ -52,9 +52,9 @@ public enum SparkJobType { */ @Nonnull public static SparkJobType valueOf(@Nullable Integer value) { - for (SparkJobType sparkDevelopmentMode : values()) { - if (sparkDevelopmentMode.mode.equals(value)) { - return sparkDevelopmentMode; + for (SparkJobType sparkJobType : values()) { + if (sparkJobType.mode.equals(value)) { + return sparkJobType; } } return SparkJobType.UNKNOWN; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AppControl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AppControl.java index 88ae16956b..289768412d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AppControl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AppControl.java @@ -32,4 +32,6 @@ public class AppControl { /** allow to build the application */ private boolean allowBuild; + + private boolean allowView; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkBuildPipelineController.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkBuildPipelineController.java index 211b9c3aa7..04f0d420e8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkBuildPipelineController.java @@ -42,7 +42,7 @@ @Validated @RestController @RequestMapping("flink/pipe") -public class FlinkApplicationBuildPipelineController { +public class FlinkBuildPipelineController { @Autowired private AppBuildPipeService appBuildPipeService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java index d5504d5aca..a449cc6670 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java @@ -20,15 +20,17 @@ import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.FlinkApplication; +import org.apache.streampark.console.core.entity.SparkApplication; +import org.apache.streampark.console.core.enums.EngineTypeEnum; import org.apache.streampark.console.core.enums.UserTypeEnum; import org.apache.streampark.console.core.service.ProxyService; import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; +import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.system.entity.Member; import org.apache.streampark.console.system.entity.User; import org.apache.streampark.console.system.service.MemberService; -import org.apache.streampark.console.system.service.UserService; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -53,16 +55,16 @@ public class ProxyController { private ProxyService proxyService; @Autowired - private FlinkApplicationManageService applicationManageService; + private FlinkApplicationManageService flinkApplicationManageService; @Autowired - private ApplicationLogService logService; + private SparkApplicationManageService sparkApplicationManageService; @Autowired - private MemberService memberService; + private ApplicationLogService logService; @Autowired - private UserService userService; + private MemberService memberService; @GetMapping("{type}/{id}/assets/**") public ResponseEntity proxyFlinkAssets(HttpServletRequest request, @PathVariable("type") String type, @@ -79,15 +81,17 @@ public ResponseEntity proxyFlink(HttpServletRequest request, @PathVariable("t private ResponseEntity proxy(String type, HttpServletRequest request, Long id) throws Exception { ApplicationLog log; - FlinkApplication app; - switch (type) { case "flink": - app = applicationManageService.getApp(id); - checkProxyApp(app); - return proxyService.proxyFlink(request, app); - case "cluster": - return proxyService.proxyCluster(request, id); + FlinkApplication flinkApplication = flinkApplicationManageService.getApp(id); + checkProxyApp(flinkApplication.getTeamId()); + return proxyService.proxyFlink(request, flinkApplication); + case "spark": + SparkApplication sparkApplication = sparkApplicationManageService.getApp(id); + checkProxyApp(sparkApplication.getTeamId()); + return proxyService.proxySpark(request, sparkApplication); + case "flink_cluster": + return proxyService.proxyFlinkCluster(request, id); case "history": log = logService.getById(id); checkProxyAppLog(log); @@ -101,14 +105,12 @@ private ResponseEntity proxy(String type, HttpServletRequest request, Long id } } - private void checkProxyApp(FlinkApplication app) { - ApiAlertException.throwIfNull(app, "Invalid operation, application is invalid."); - + private void checkProxyApp(Long teamId) { User user = ServiceHelper.getLoginUser(); ApiAlertException.throwIfNull(user, "Permission denied, please login first."); if (user.getUserType() != UserTypeEnum.ADMIN) { - Member member = memberService.getByTeamIdUserName(app.getTeamId(), user.getUsername()); + Member member = memberService.getByTeamIdUserName(teamId, user.getUsername()); ApiAlertException.throwIfNull(member, "Permission denied, this job not created by the current user, And the job cannot be found in the current user's team."); } @@ -116,7 +118,13 @@ private void checkProxyApp(FlinkApplication app) { private void checkProxyAppLog(ApplicationLog log) { ApiAlertException.throwIfNull(log, "Invalid operation, The application log not found."); - FlinkApplication app = applicationManageService.getById(log.getAppId()); - checkProxyApp(app); + if (log.getJobType() == EngineTypeEnum.FLINK.getCode()) { + FlinkApplication app = flinkApplicationManageService.getById(log.getAppId()); + checkProxyApp(app.getTeamId()); + } + if (log.getJobType() == EngineTypeEnum.SPARK.getCode()) { + SparkApplication app = sparkApplicationManageService.getById(log.getAppId()); + checkProxyApp(app.getTeamId()); + } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java index 2e181883de..e0849291f6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java @@ -23,15 +23,15 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.annotation.AppChangeEvent; +import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.FlinkApplicationBackUp; import org.apache.streampark.console.core.entity.SparkApplication; -import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.enums.AppExistsStateEnum; import org.apache.streampark.console.core.service.ResourceService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; -import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -69,7 +69,7 @@ public class SparkApplicationController { private FlinkApplicationBackUpService backUpService; @Autowired - private SparkApplicationLogService applicationLogService; + private ApplicationLogService applicationLogService; @Autowired private ResourceService resourceService; @@ -152,8 +152,8 @@ public RestResponse start(SparkApplication app) { @PostMapping("cancel") @RequiresPermissions("app:cancel") - public RestResponse stop(SparkApplication app) throws Exception { - applicationActionService.stop(app); + public RestResponse cancel(SparkApplication app) throws Exception { + applicationActionService.cancel(app); return RestResponse.success(); } @@ -202,8 +202,8 @@ public RestResponse backups(FlinkApplicationBackUp backUp, RestRequest request) } @PostMapping("opt_log") - public RestResponse optionlog(SparkApplicationLog applicationLog, RestRequest request) { - IPage applicationList = applicationLogService.getPage(applicationLog, request); + public RestResponse optionlog(ApplicationLog applicationLog, RestRequest request) { + IPage applicationList = applicationLogService.getPage(applicationLog, request); return RestResponse.success(applicationList); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkBuildPipelineController.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkBuildPipelineController.java index dfaadf8c6b..2efc0668b0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkBuildPipelineController.java @@ -39,7 +39,7 @@ @Validated @RestController @RequestMapping("spark/pipe") -public class SparkApplicationBuildPipelineController { +public class SparkBuildPipelineController { @Autowired private SparkAppBuildPipeService appBuildPipeService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java deleted file mode 100644 index fa68395eab..0000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.core.controller; - -import org.apache.streampark.console.base.exception.ApiAlertException; -import org.apache.streampark.console.core.entity.SparkApplication; -import org.apache.streampark.console.core.entity.SparkApplicationLog; -import org.apache.streampark.console.core.enums.UserTypeEnum; -import org.apache.streampark.console.core.service.ProxyService; -import org.apache.streampark.console.core.service.application.SparkApplicationLogService; -import org.apache.streampark.console.core.service.application.SparkApplicationManageService; -import org.apache.streampark.console.core.util.ServiceHelper; -import org.apache.streampark.console.system.entity.Member; -import org.apache.streampark.console.system.entity.User; -import org.apache.streampark.console.system.service.MemberService; - -import org.apache.shiro.authz.annotation.RequiresPermissions; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import javax.servlet.http.HttpServletRequest; - -@Slf4j -@Validated -@RestController -@RequestMapping("spark/proxy") -public class SparkProxyController { - - @Autowired - private ProxyService proxyService; - - @Autowired - private SparkApplicationManageService applicationManageService; - - @Autowired - private SparkApplicationLogService logService; - - @Autowired - private MemberService memberService; - - @GetMapping("{type}/{id}/**") - @RequiresPermissions("app:view") - public ResponseEntity proxySpark(HttpServletRequest request, @PathVariable("type") String type, - @PathVariable("id") Long id) throws Exception { - return proxy(type, request, id); - } - - private ResponseEntity proxy(String type, HttpServletRequest request, Long id) throws Exception { - SparkApplicationLog log; - - switch (type) { - case "yarn": - log = logService.getById(id); - checkProxyAppLog(log); - return proxyService.proxyYarn(request, log); - default: - return ResponseEntity.notFound().build(); - } - } - - private void checkProxyApp(SparkApplication app) { - ApiAlertException.throwIfNull(app, "Invalid operation, application is invalid."); - - User user = ServiceHelper.getLoginUser(); - ApiAlertException.throwIfNull(user, "Permission denied, please login first."); - - if (user.getUserType() != UserTypeEnum.ADMIN) { - Member member = memberService.getByTeamIdUserName(app.getTeamId(), user.getUsername()); - ApiAlertException.throwIfNull(member, - "Permission denied, this job not created by the current user, And the job cannot be found in the current user's team."); - } - } - - private void checkProxyAppLog(SparkApplicationLog log) { - ApiAlertException.throwIfNull(log, "Invalid operation, The application log not found."); - SparkApplication app = applicationManageService.getById(log.getAppId()); - checkProxyApp(app); - } -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java index 3b9c86ffa9..9182879c50 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java @@ -276,8 +276,8 @@ public void setState(Integer state) { } public void setYarnQueueByHotParams() { - if (!(FlinkDeployMode.YARN_APPLICATION == this.getFlinkDeployMode() - || FlinkDeployMode.YARN_PER_JOB == this.getFlinkDeployMode())) { + if (!(FlinkDeployMode.YARN_APPLICATION == this.getDeployModeEnum() + || FlinkDeployMode.YARN_PER_JOB == this.getDeployModeEnum())) { return; } @@ -341,7 +341,7 @@ public ReleaseStateEnum getReleaseState() { } @JsonIgnore - public FlinkJobType getDevelopmentMode() { + public FlinkJobType getJobTypeEnum() { return FlinkJobType.of(jobType); } @@ -356,7 +356,7 @@ public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() { } @JsonIgnore - public FlinkDeployMode getFlinkDeployMode() { + public FlinkDeployMode getDeployModeEnum() { return FlinkDeployMode.of(deployMode); } @@ -400,7 +400,7 @@ public String getRemoteAppHome() { /** Automatically identify remoteAppHome or localAppHome based on app FlinkDeployMode */ @JsonIgnore public String getAppHome() { - switch (this.getFlinkDeployMode()) { + switch (this.getDeployModeEnum()) { case KUBERNETES_NATIVE_APPLICATION: case KUBERNETES_NATIVE_SESSION: case YARN_PER_JOB: @@ -412,7 +412,7 @@ public String getAppHome() { return getRemoteAppHome(); default: throw new UnsupportedOperationException( - "unsupported deployMode ".concat(getFlinkDeployMode().getName())); + "unsupported deployMode ".concat(getDeployModeEnum().getName())); } } @@ -558,7 +558,7 @@ public void updateHotParams(FlinkApplication appParam) { if (appParam != this) { this.hotParams = null; } - FlinkDeployMode deployModeEnum = appParam.getFlinkDeployMode(); + FlinkDeployMode deployModeEnum = appParam.getDeployModeEnum(); Map hotParams = new HashMap<>(0); if (needFillYarnQueueLabel(deployModeEnum)) { hotParams.putAll(YarnQueueLabelExpression.getQueueLabelMap(appParam.getYarnQueue())); @@ -594,7 +594,7 @@ public int hashCode() { } public boolean isKubernetesModeJob() { - return FlinkDeployMode.isKubernetesMode(this.getFlinkDeployMode()); + return FlinkDeployMode.isKubernetesMode(this.getDeployModeEnum()); } public static class SFunc { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplicationBackUp.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplicationBackUp.java index 20712df0a1..1376ac46fc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplicationBackUp.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplicationBackUp.java @@ -62,7 +62,7 @@ public FlinkApplicationBackUp(FlinkApplication application) { } private void renderPath(FlinkApplication application) { - switch (application.getFlinkDeployMode()) { + switch (application.getDeployModeEnum()) { case KUBERNETES_NATIVE_APPLICATION: case KUBERNETES_NATIVE_SESSION: case YARN_PER_JOB: @@ -80,7 +80,7 @@ private void renderPath(FlinkApplication application) { break; default: throw new UnsupportedOperationException( - "unsupported deployMode ".concat(application.getFlinkDeployMode().getName())); + "unsupported deployMode ".concat(application.getDeployModeEnum().getName())); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java index da64214016..7d58641368 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java @@ -110,7 +110,7 @@ public class SparkApplication extends BaseEntity { * driver pod name for spark on K8s.(will be supported in the future) */ @TableField(updateStrategy = FieldStrategy.IGNORED) - private String appId; + private String clusterId; private String yarnQueue; @@ -233,8 +233,8 @@ public void setState(Integer state) { } public void resolveYarnQueue() { - if (!(SparkDeployMode.YARN_CLIENT == this.getSparkDeployMode() - || SparkDeployMode.YARN_CLUSTER == this.getSparkDeployMode())) { + if (!(SparkDeployMode.YARN_CLIENT == this.getDeployModeEnum() + || SparkDeployMode.YARN_CLUSTER == this.getDeployModeEnum())) { return; } if (StringUtils.isBlank(this.yarnQueue)) { @@ -312,7 +312,7 @@ public ReleaseStateEnum getReleaseState() { } @JsonIgnore - public SparkJobType getDevelopmentMode() { + public SparkJobType getJobTypeEnum() { return SparkJobType.valueOf(jobType); } @@ -322,7 +322,7 @@ public SparkAppStateEnum getStateEnum() { } @JsonIgnore - public SparkDeployMode getSparkDeployMode() { + public SparkDeployMode getDeployModeEnum() { return SparkDeployMode.of(deployMode); } @@ -351,7 +351,7 @@ public String getRemoteAppHome() { /** Automatically identify remoteAppHome or localAppHome based on app SparkDeployMode */ @JsonIgnore public String getAppHome() { - switch (this.getSparkDeployMode()) { + switch (this.getDeployModeEnum()) { case REMOTE: case LOCAL: return getLocalAppHome(); @@ -360,7 +360,7 @@ public String getAppHome() { return getRemoteAppHome(); default: throw new UnsupportedOperationException( - "unsupported deployMode ".concat(getSparkDeployMode().getName())); + "unsupported deployMode ".concat(getDeployModeEnum().getName())); } } @@ -509,7 +509,7 @@ public int hashCode() { public static class SFunc { public static final SFunction ID = SparkApplication::getId; - public static final SFunction APP_ID = SparkApplication::getAppId; + public static final SFunction APP_ID = SparkApplication::getClusterId; public static final SFunction START_TIME = SparkApplication::getStartTime; public static final SFunction END_TIME = SparkApplication::getEndTime; public static final SFunction DURATION = SparkApplication::getDuration; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationBackUp.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationBackUp.java index bc51ce0795..dc16c8061e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationBackUp.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationBackUp.java @@ -64,7 +64,7 @@ public SparkApplicationBackUp(SparkApplication application) { } private void renderPath(SparkApplication application) { - switch (application.getSparkDeployMode()) { + switch (application.getDeployModeEnum()) { case LOCAL: this.path = String.format( "%s/%d/%d", @@ -78,7 +78,7 @@ private void renderPath(SparkApplication application) { break; default: throw new UnsupportedOperationException( - "unsupported deployMode ".concat(application.getSparkDeployMode().getName())); + "unsupported deployMode ".concat(application.getDeployModeEnum().getName())); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java deleted file mode 100644 index 8c51c92ed5..0000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.core.entity; - -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; - -import java.util.Date; - -@Data -@TableName("t_spark_log") -@Slf4j -public class SparkApplicationLog { - - @TableId(type = IdType.AUTO) - private Long id; - /** appId */ - private Long appId; - /** applicationId */ - private String sparkAppId; - /** tracking url of current spark application */ - private String trackUrl; - /** start status */ - private Boolean success; - /** option name */ - private Integer optionName; - /** option time */ - private Date optionTime; - /** exception at the start */ - private String exception; - /** The user who operates the application */ - private Long userId; -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java index f397a64fba..c2b2beb43a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java @@ -47,15 +47,10 @@ public enum DistributedTaskEnum { */ ABORT(4), - /** - * Stop the given application. - */ - STOP(5), - /** * Forces the given application to stop. */ - FORCED_STOP(6); + FORCED_STOP(5); private final int value; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java index 4d9ee1ae2b..54b0d44822 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java @@ -25,7 +25,7 @@ @Getter public enum SparkOperationEnum { - RELEASE(0), START(1), STOP(2); + RELEASE(0), START(1), CANCEL(2); private final int value; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java deleted file mode 100644 index c9e77768a5..0000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.core.mapper; - -import org.apache.streampark.console.core.entity.SparkApplicationLog; - -import com.baomidou.mybatisplus.core.mapper.BaseMapper; - -public interface SparkApplicationLogMapper extends BaseMapper { -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java index c9f269cbf3..8722db5a63 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java @@ -19,7 +19,7 @@ import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.FlinkApplication; -import org.apache.streampark.console.core.entity.SparkApplicationLog; +import org.apache.streampark.console.core.entity.SparkApplication; import org.springframework.http.ResponseEntity; @@ -29,11 +29,11 @@ public interface ProxyService { ResponseEntity proxyFlink(HttpServletRequest request, FlinkApplication app) throws Exception; - ResponseEntity proxyYarn(HttpServletRequest request, SparkApplicationLog log) throws Exception; + ResponseEntity proxySpark(HttpServletRequest request, SparkApplication app) throws Exception; ResponseEntity proxyYarn(HttpServletRequest request, ApplicationLog log) throws Exception; ResponseEntity proxyHistory(HttpServletRequest request, ApplicationLog log) throws Exception; - ResponseEntity proxyCluster(HttpServletRequest request, Long clusterId) throws Exception; + ResponseEntity proxyFlinkCluster(HttpServletRequest request, Long clusterId) throws Exception; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java index 682717cbec..5081be98e7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java @@ -59,7 +59,7 @@ public interface SparkApplicationActionService extends IService { - - /** - * Retrieves a page of {@link SparkApplicationLog} objects based on the provided parameters. - * - * @param sparkApplicationLog The {@link SparkApplicationLog} object containing the search criteria. - * @param request The {@link RestRequest} object used for pagination and sorting. - * @return An {@link IPage} containing the retrieved {@link SparkApplicationLog} objects. - */ - IPage getPage(SparkApplicationLog sparkApplicationLog, RestRequest request); - - /** - * remove application log by application id - * - * @param appId The id of the application to be removed - */ - void removeByAppId(Long appId); -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkAppBuildPipeServiceImpl.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkAppBuildPipeServiceImpl.java index 3434e67db0..462892af12 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkAppBuildPipeServiceImpl.java @@ -116,7 +116,7 @@ @Service @Slf4j @Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class) -public class AppBuildPipeServiceImpl +public class FlinkAppBuildPipeServiceImpl extends ServiceImpl implements @@ -455,13 +455,13 @@ private BuildPipeline createPipelineInstance(@Nonnull FlinkApplication app) { } } - FlinkDeployMode deployModeEnum = app.getFlinkDeployMode(); + FlinkDeployMode deployModeEnum = app.getDeployModeEnum(); String mainClass = Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS; switch (deployModeEnum) { case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); String localWorkspace = app.getLocalAppHome().concat("/lib"); - if (FlinkJobType.CUSTOM_CODE == app.getDevelopmentMode() + if (FlinkJobType.CUSTOM_CODE == app.getJobTypeEnum() && ApplicationType.APACHE_FLINK == app.getApplicationType()) { yarnProvidedPath = app.getAppHome(); localWorkspace = app.getLocalAppHome(); @@ -490,7 +490,7 @@ private BuildPipeline createPipelineInstance(@Nonnull FlinkApplication app) { return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest); default: throw new UnsupportedOperationException( - "Unsupported Building Application for DeployMode: " + app.getFlinkDeployMode()); + "Unsupported Building Application for DeployMode: " + app.getDeployModeEnum()); } } @@ -505,7 +505,7 @@ private FlinkYarnApplicationBuildRequest buildFlinkYarnApplicationBuildRequest( mainClass, localWorkspace, yarnProvidedPath, - app.getDevelopmentMode(), + app.getJobTypeEnum(), getMergedDependencyInfo(app)); } @@ -521,8 +521,8 @@ private FlinkK8sApplicationBuildRequest buildFlinkK8sApplicationBuildRequest( app.getLocalAppHome(), mainClass, flinkUserJar, - app.getFlinkDeployMode(), - app.getDevelopmentMode(), + app.getDeployModeEnum(), + app.getJobTypeEnum(), flinkEnv.getFlinkVersion(), getMergedDependencyInfo(app), app.getJobName(), @@ -549,8 +549,8 @@ private FlinkK8sSessionBuildRequest buildFlinkK8sSessionBuildRequest( app.getLocalAppHome(), mainClass, flinkUserJar, - app.getFlinkDeployMode(), - app.getDevelopmentMode(), + app.getDeployModeEnum(), + app.getJobTypeEnum(), flinkEnv.getFlinkVersion(), getMergedDependencyInfo(app), app.getClusterId(), @@ -569,15 +569,15 @@ private FlinkRemotePerJobBuildRequest buildFlinkRemotePerJobBuildRequest( mainClass, flinkUserJar, app.isCustomCodeJob(), - app.getFlinkDeployMode(), - app.getDevelopmentMode(), + app.getDeployModeEnum(), + app.getJobTypeEnum(), flinkEnv.getFlinkVersion(), getMergedDependencyInfo(app)); } /** copy from {@link FlinkApplicationActionService#start(FlinkApplication, boolean)} */ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication app) { - switch (app.getDevelopmentMode()) { + switch (app.getJobTypeEnum()) { case CUSTOM_CODE: switch (app.getApplicationType()) { case STREAMPARK_FLINK: @@ -594,14 +594,14 @@ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication app) { return String.format("%s/%s", app.getAppHome(), app.getJar()); case FLINK_SQL: String sqlDistJar = ServiceHelper.getFlinkSqlClientJar(flinkEnv); - if (app.getFlinkDeployMode() == FlinkDeployMode.YARN_APPLICATION) { + if (app.getDeployModeEnum() == FlinkDeployMode.YARN_APPLICATION) { String clientPath = Workspace.remote().APP_CLIENT(); return String.format("%s/%s", clientPath, sqlDistJar); } return Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar); default: throw new UnsupportedOperationException( - "[StreamPark] unsupported JobType: " + app.getDevelopmentMode()); + "[StreamPark] unsupported JobType: " + app.getJobTypeEnum()); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index 582d803166..cd0344fdb7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.streampark.console.core.enums.CheckPointTypeEnum; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; import org.apache.streampark.console.core.enums.DistributedTaskEnum; +import org.apache.streampark.console.core.enums.EngineTypeEnum; import org.apache.streampark.console.core.enums.FlinkAppStateEnum; import org.apache.streampark.console.core.enums.OperationEnum; import org.apache.streampark.console.core.enums.OptionStateEnum; @@ -270,6 +271,7 @@ public void cancel(FlinkApplication appParam) throws Exception { application.setState(FlinkAppStateEnum.CANCELLING.getValue()); ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setJobType(EngineTypeEnum.FLINK.getCode()); applicationLog.setOptionName(OperationEnum.CANCEL.getValue()); applicationLog.setAppId(application.getId()); applicationLog.setTrackingUrl(application.getJobManagerUrl()); @@ -305,7 +307,7 @@ public void cancel(FlinkApplication appParam) throws Exception { Map properties = new HashMap<>(); - if (FlinkDeployMode.isRemoteMode(application.getFlinkDeployMode())) { + if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); ApiAlertException.throwIfNull( cluster, @@ -410,12 +412,12 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception { ApiAlertException.throwIfTrue( !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly."); - if (FlinkDeployMode.isRemoteMode(application.getFlinkDeployMode()) - || FlinkDeployMode.isSessionMode(application.getFlinkDeployMode())) { + if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum()) + || FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) { checkBeforeStart(application); } - if (FlinkDeployMode.isYarnMode(application.getFlinkDeployMode())) { + if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) { ApiAlertException.throwIfTrue( !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(), "[StreamPark] The same task name is already running in the yarn queue"); @@ -457,7 +459,7 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception { String appConf = userJarAndAppConf.t2; BuildResult buildResult = buildPipeline.getBuildResult(); - if (FlinkDeployMode.YARN_APPLICATION == application.getFlinkDeployMode()) { + if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) { buildResult = new ShadedBuildResponse(null, flinkUserJar, true); } @@ -517,6 +519,7 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception { @Nonnull private ApplicationLog constructAppLog(FlinkApplication application) { ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setJobType(EngineTypeEnum.FLINK.getCode()); applicationLog.setOptionName(OperationEnum.START.getValue()); applicationLog.setAppId(application.getId()); applicationLog.setOptionTime(new Date()); @@ -648,7 +651,7 @@ private void starting(FlinkApplication application) { } private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkApplication application) { - FlinkDeployMode deployModeEnum = application.getFlinkDeployMode(); + FlinkDeployMode deployModeEnum = application.getDeployModeEnum(); FlinkApplicationConfig applicationConfig = configService.getEffective(application.getId()); ApiAlertException.throwIfNull( @@ -657,7 +660,7 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl String flinkUserJar = null; String appConf = null; - switch (application.getDevelopmentMode()) { + switch (application.getJobTypeEnum()) { case FLINK_SQL: FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); AssertUtils.notNull(flinkSql); @@ -751,7 +754,7 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl private Map getProperties(FlinkApplication application, String runtimeProperties) { Map properties = new HashMap<>(application.getOptionMap()); - if (FlinkDeployMode.isRemoteMode(application.getFlinkDeployMode())) { + if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); ApiAlertException.throwIfNull( cluster, @@ -762,8 +765,8 @@ private Map getProperties(FlinkApplication application, String r URI activeAddress = cluster.getRemoteURI(); properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost()); properties.put(RestOptions.PORT.key(), activeAddress.getPort()); - } else if (FlinkDeployMode.isYarnMode(application.getFlinkDeployMode())) { - if (FlinkDeployMode.YARN_SESSION == application.getFlinkDeployMode()) { + } else if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) { + if (FlinkDeployMode.YARN_SESSION == application.getDeployModeEnum()) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); ApiAlertException.throwIfNull( cluster, @@ -780,7 +783,7 @@ private Map getProperties(FlinkApplication application, String r Optional.ofNullable(yarnLabelExpr) .ifPresent(yLabel -> properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel)); } - } else if (FlinkDeployMode.isKubernetesMode(application.getFlinkDeployMode())) { + } else if (FlinkDeployMode.isKubernetesMode(application.getDeployModeEnum())) { properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always"); } @@ -818,7 +821,7 @@ private void doAbort(Long id) { FlinkAppHttpWatcher.unWatching(application.getId()); } // kill application - if (FlinkDeployMode.isYarnMode(application.getFlinkDeployMode())) { + if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) { try { List applications = applicationInfoService .getYarnAppReport(application.getJobName()); @@ -866,7 +869,7 @@ private Tuple3 getNamespaceClusterId( String clusterId = null; String k8sNamespace = null; FlinkK8sRestExposedType exposedType = null; - switch (application.getFlinkDeployMode()) { + switch (application.getDeployModeEnum()) { case YARN_APPLICATION: case YARN_PER_JOB: case YARN_SESSION: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java index dc596269d2..6927945973 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java @@ -222,8 +222,8 @@ public boolean checkEnv(FlinkApplication appParam) throws ApplicationException { envInitializer.checkFlinkEnv(application.getStorageType(), flinkEnv); envInitializer.storageInitialize(application.getStorageType()); - if (FlinkDeployMode.YARN_SESSION == application.getFlinkDeployMode() - || FlinkDeployMode.REMOTE == application.getFlinkDeployMode()) { + if (FlinkDeployMode.YARN_SESSION == application.getDeployModeEnum() + || FlinkDeployMode.REMOTE == application.getDeployModeEnum()) { FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId()); boolean conned = flinkClusterWatcher.verifyClusterConnection(flinkCluster); if (!conned) { @@ -370,7 +370,7 @@ public String k8sStartLog(Long id, Integer offset, Integer limit) throws Excepti ApiAlertException.throwIfNull( application, String.format("The application id=%s can't be found.", id)); ApiAlertException.throwIfFalse( - FlinkDeployMode.isKubernetesMode(application.getFlinkDeployMode()), + FlinkDeployMode.isKubernetesMode(application.getDeployModeEnum()), "Job deployMode must be kubernetes-session|kubernetes-application."); CompletableFuture future = CompletableFuture.supplyAsync( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java index 684b04dd56..6ddbf4b23d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java @@ -311,7 +311,8 @@ private AppControl getAppControl(FlinkApplication record) { !record.shouldTracking() && PipelineStatusEnum.success.getCode() .equals(record.getBuildStatus())) - .setAllowStop(record.isRunning()); + .setAllowStop(record.isRunning()) + .setAllowView(record.shouldTracking()); } @Override @@ -393,7 +394,7 @@ public Long copy(FlinkApplication appParam) { newApp.setJobName(jobName); newApp.setClusterId( - FlinkDeployMode.isSessionMode(persist.getFlinkDeployMode()) + FlinkDeployMode.isSessionMode(persist.getDeployModeEnum()) ? persist.getClusterId() : null); newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() : persist.getArgs()); @@ -475,7 +476,7 @@ public boolean update(FlinkApplication appParam) { FlinkApplication application = getById(appParam.getId()); /* If the original mode is remote, k8s-session, yarn-session, check cluster status */ - FlinkDeployMode flinkDeployMode = application.getFlinkDeployMode(); + FlinkDeployMode flinkDeployMode = application.getDeployModeEnum(); switch (flinkDeployMode) { case REMOTE: case YARN_SESSION: @@ -560,7 +561,7 @@ public boolean update(FlinkApplication appParam) { application.setCpMaxFailureInterval(appParam.getCpMaxFailureInterval()); application.setTags(appParam.getTags()); - switch (appParam.getFlinkDeployMode()) { + switch (appParam.getDeployModeEnum()) { case YARN_APPLICATION: application.setHadoopUser(appParam.getHadoopUser()); break; @@ -786,7 +787,7 @@ public FlinkApplication getApp(Long id) { */ @VisibleForTesting public boolean validateQueueIfNeeded(FlinkApplication appParam) { - yarnQueueService.checkQueueLabel(appParam.getFlinkDeployMode(), appParam.getYarnQueue()); + yarnQueueService.checkQueueLabel(appParam.getDeployModeEnum(), appParam.getYarnQueue()); if (!isYarnNotDefaultQueue(appParam)) { return true; } @@ -802,13 +803,13 @@ public boolean validateQueueIfNeeded(FlinkApplication appParam) { */ @VisibleForTesting public boolean validateQueueIfNeeded(FlinkApplication oldApp, FlinkApplication newApp) { - yarnQueueService.checkQueueLabel(newApp.getFlinkDeployMode(), newApp.getYarnQueue()); + yarnQueueService.checkQueueLabel(newApp.getDeployModeEnum(), newApp.getYarnQueue()); if (!isYarnNotDefaultQueue(newApp)) { return true; } oldApp.setYarnQueueByHotParams(); - if (FlinkDeployMode.isYarnPerJobOrAppMode(newApp.getFlinkDeployMode()) + if (FlinkDeployMode.isYarnPerJobOrAppMode(newApp.getDeployModeEnum()) && StringUtils.equals(oldApp.getYarnQueue(), newApp.getYarnQueue())) { return true; } @@ -824,7 +825,7 @@ public boolean validateQueueIfNeeded(FlinkApplication oldApp, FlinkApplication n * (empty or default), return true, false else. */ private boolean isYarnNotDefaultQueue(FlinkApplication application) { - return FlinkDeployMode.isYarnPerJobOrAppMode(application.getFlinkDeployMode()) + return FlinkDeployMode.isYarnPerJobOrAppMode(application.getDeployModeEnum()) && !yarnQueueService.isDefaultQueue(application.getYarnQueue()); } @@ -849,7 +850,7 @@ private boolean isK8sPodTemplateChanged(FlinkApplication application, FlinkAppli private boolean isYarnApplicationModeChange(FlinkApplication application, FlinkApplication appParam) { return !application.getDeployMode().equals(appParam.getDeployMode()) - && (FlinkDeployMode.YARN_APPLICATION == appParam.getFlinkDeployMode() - || FlinkDeployMode.YARN_APPLICATION == application.getFlinkDeployMode()); + && (FlinkDeployMode.YARN_APPLICATION == appParam.getDeployModeEnum() + || FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java index 063a96c0c7..167652d220 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java @@ -30,14 +30,15 @@ import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.bean.Dependency; import org.apache.streampark.console.core.entity.AppBuildPipeline; +import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.Message; import org.apache.streampark.console.core.entity.Resource; import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.entity.SparkApplicationConfig; -import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.entity.SparkEnv; import org.apache.streampark.console.core.entity.SparkSql; import org.apache.streampark.console.core.enums.CandidateTypeEnum; +import org.apache.streampark.console.core.enums.EngineTypeEnum; import org.apache.streampark.console.core.enums.NoticeTypeEnum; import org.apache.streampark.console.core.enums.OptionStateEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; @@ -47,10 +48,10 @@ import org.apache.streampark.console.core.service.ResourceService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.SparkSqlService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.SparkAppBuildPipeService; import org.apache.streampark.console.core.service.application.SparkApplicationConfigService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; -import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher; @@ -61,8 +62,8 @@ import org.apache.streampark.flink.packer.pipeline.PipeWatcher; import org.apache.streampark.flink.packer.pipeline.PipelineSnapshot; import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum; -import org.apache.streampark.flink.packer.pipeline.SparkYarnApplicationBuildRequest; -import org.apache.streampark.flink.packer.pipeline.impl.SparkYarnApplicationBuildPipeline; +import org.apache.streampark.flink.packer.pipeline.SparkYarnBuildRequest; +import org.apache.streampark.flink.packer.pipeline.impl.SparkYarnBuildPipeline; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -111,13 +112,13 @@ public class SparkAppBuildPipeServiceImpl private MessageService messageService; @Autowired - private SparkApplicationManageService applicationManageService; + private ApplicationLogService applicationLogService; @Autowired - private SparkApplicationInfoService applicationInfoService; + private SparkApplicationManageService applicationManageService; @Autowired - private SparkApplicationLogService applicationLogService; + private SparkApplicationInfoService applicationInfoService; @Autowired private SparkAppHttpWatcher sparkAppHttpWatcher; @@ -145,7 +146,8 @@ public boolean buildApplication(@Nonnull Long appId, boolean forceBuild) { checkBuildEnv(appId, forceBuild); SparkApplication app = applicationManageService.getById(appId); - SparkApplicationLog applicationLog = new SparkApplicationLog(); + ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setJobType(EngineTypeEnum.SPARK.getCode()); applicationLog.setOptionName(RELEASE.getValue()); applicationLog.setAppId(app.getId()); applicationLog.setOptionTime(new Date()); @@ -370,34 +372,35 @@ private BuildPipeline createPipelineInstance(@Nonnull SparkApplication app) { } } - SparkDeployMode deployModeEnum = app.getSparkDeployMode(); + SparkDeployMode deployModeEnum = app.getDeployModeEnum(); String mainClass = Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS; switch (deployModeEnum) { - case YARN_CLUSTER: case YARN_CLIENT: + case YARN_CLUSTER: String yarnProvidedPath = app.getAppLib(); String localWorkspace = app.getLocalAppHome().concat("/lib"); if (ApplicationType.APACHE_SPARK == app.getApplicationType()) { yarnProvidedPath = app.getAppHome(); localWorkspace = app.getLocalAppHome(); } - SparkYarnApplicationBuildRequest yarnAppRequest = new SparkYarnApplicationBuildRequest( + SparkYarnBuildRequest yarnAppRequest = new SparkYarnBuildRequest( app.getAppName(), mainClass, localWorkspace, yarnProvidedPath, - app.getDevelopmentMode(), + app.getJobTypeEnum(), + deployModeEnum, getMergedDependencyInfo(app)); log.info("Submit params to building pipeline : {}", yarnAppRequest); - return SparkYarnApplicationBuildPipeline.of(yarnAppRequest); + return SparkYarnBuildPipeline.of(yarnAppRequest); default: throw new UnsupportedOperationException( - "Unsupported Building Application for DeployMode: " + app.getSparkDeployMode()); + "Unsupported Building Application for DeployMode: " + app.getDeployModeEnum()); } } private String retrieveSparkUserJar(SparkEnv sparkEnv, SparkApplication app) { - switch (app.getDevelopmentMode()) { + switch (app.getJobTypeEnum()) { case SPARK_JAR: switch (app.getApplicationType()) { case STREAMPARK_SPARK: @@ -412,18 +415,16 @@ private String retrieveSparkUserJar(SparkEnv sparkEnv, SparkApplication app) { } case PYSPARK: return String.format("%s/%s", app.getAppHome(), app.getJar()); - case SPARK_SQL: String sqlDistJar = ServiceHelper.getSparkSqlClientJar(sparkEnv); - - if (app.getSparkDeployMode() == SparkDeployMode.YARN_CLUSTER) { + if (app.getDeployModeEnum() == SparkDeployMode.YARN_CLUSTER) { String clientPath = Workspace.remote().APP_CLIENT(); return String.format("%s/%s", clientPath, sqlDistJar); } return Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar); default: throw new UnsupportedOperationException( - "[StreamPark] unsupported JobType: " + app.getDevelopmentMode()); + "[StreamPark] unsupported JobType: " + app.getJobTypeEnum()); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index 40912e3be6..5586bbfe81 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -32,14 +32,15 @@ import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApplicationException; import org.apache.streampark.console.core.entity.AppBuildPipeline; +import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.Resource; import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.entity.SparkApplicationConfig; -import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.entity.SparkEnv; import org.apache.streampark.console.core.entity.SparkSql; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; import org.apache.streampark.console.core.enums.DistributedTaskEnum; +import org.apache.streampark.console.core.enums.EngineTypeEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.enums.SparkAppStateEnum; import org.apache.streampark.console.core.enums.SparkOperationEnum; @@ -50,11 +51,11 @@ import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.SparkSqlService; import org.apache.streampark.console.core.service.VariableService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.SparkAppBuildPipeService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import org.apache.streampark.console.core.service.application.SparkApplicationConfigService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; -import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher; import org.apache.streampark.flink.packer.pipeline.BuildResult; @@ -114,7 +115,7 @@ public class SparkApplicationActionServiceImpl private SparkApplicationConfigService configService; @Autowired - private SparkApplicationLogService applicationLogService; + private ApplicationLogService applicationLogService; @Autowired private SparkEnvService sparkEnvService; @@ -176,7 +177,7 @@ public void restart(SparkApplication appParam) throws Exception { distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.RESTART); return; } - this.stop(appParam); + this.cancel(appParam); this.start(appParam, false); } @@ -202,21 +203,22 @@ public void forcedStop(Long id) { } @Override - public void stop(SparkApplication appParam) throws Exception { + public void cancel(SparkApplication appParam) throws Exception { // For HA purposes, if the task is not processed locally, save the Distribution task and return if (!distributedTaskService.isLocalProcessing(appParam.getId())) { - distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.STOP); + distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.CANCEL); return; } SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STOPPING); SparkApplication application = getById(appParam.getId()); application.setState(SparkAppStateEnum.STOPPING.getValue()); - SparkApplicationLog applicationLog = new SparkApplicationLog(); - applicationLog.setOptionName(SparkOperationEnum.STOP.getValue()); + ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setJobType(EngineTypeEnum.SPARK.getCode()); + applicationLog.setOptionName(SparkOperationEnum.CANCEL.getValue()); applicationLog.setAppId(application.getId()); applicationLog.setOptionTime(new Date()); - applicationLog.setSparkAppId(application.getAppId()); + applicationLog.setClusterId(application.getClusterId()); applicationLog.setUserId(ServiceHelper.getUserId()); application.setOptionTime(new Date()); this.baseMapper.updateById(application); @@ -236,7 +238,7 @@ public void stop(SparkApplication appParam) throws Exception { sparkEnv.getSparkVersion(), SparkDeployMode.of(application.getDeployMode()), stopProper, - application.getAppId()); + application.getClusterId()); CompletableFuture stopFuture = CompletableFuture.supplyAsync(() -> SparkClient.cancel(stopRequest), executorService); @@ -284,7 +286,7 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId()); ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found spark version"); - if (SparkDeployMode.isYarnMode(application.getSparkDeployMode())) { + if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) { checkYarnBeforeStart(application); } @@ -304,7 +306,8 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { // 2) update app state to starting... starting(application); - SparkApplicationLog applicationLog = new SparkApplicationLog(); + ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setJobType(EngineTypeEnum.SPARK.getCode()); applicationLog.setOptionName(SparkOperationEnum.START.getValue()); applicationLog.setAppId(application.getId()); applicationLog.setOptionTime(new Date()); @@ -327,7 +330,7 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { String appConf = userJarAndAppConf.f1; BuildResult buildResult = buildPipeline.getBuildResult(); - if (SparkDeployMode.isYarnMode(application.getSparkDeployMode())) { + if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) { buildResult = new ShadedBuildResponse(null, sparkUserJar, true); if (StringUtils.isNotBlank(application.getYarnQueueName())) { extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_NAME(), application.getYarnQueueName()); @@ -387,10 +390,10 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { applicationLog.setSuccess(true); application.resolveScheduleConf(response.sparkProperties()); if (StringUtils.isNoneEmpty(response.sparkAppId())) { - application.setAppId(response.sparkAppId()); + application.setClusterId(response.sparkAppId()); } - applicationLog.setSparkAppId(response.sparkAppId()); - applicationLog.setTrackUrl(response.trackingUrl()); + applicationLog.setClusterId(response.sparkAppId()); + applicationLog.setTrackingUrl(response.trackingUrl()); application.setStartTime(new Date()); application.setEndTime(null); @@ -438,7 +441,7 @@ private void starting(SparkApplication application) { private Tuple2 getUserJarAndAppConf( SparkEnv sparkEnv, SparkApplication application) { - SparkDeployMode deployModeEnum = application.getSparkDeployMode(); + SparkDeployMode deployModeEnum = application.getDeployModeEnum(); SparkApplicationConfig applicationConfig = configService.getEffective(application.getId()); ApiAlertException.throwIfNull( @@ -447,7 +450,7 @@ private Tuple2 getUserJarAndAppConf( String sparkUserJar = null; String appConf = null; - switch (application.getDevelopmentMode()) { + switch (application.getJobTypeEnum()) { case SPARK_SQL: SparkSql sparkSql = sparkSqlService.getEffective(application.getId(), false); AssertUtils.notNull(sparkSql); @@ -550,7 +553,7 @@ private void doStopped(Long id) { updateById(application); SparkAppHttpWatcher.unWatching(application.getId()); // kill application - if (SparkDeployMode.isYarnMode(application.getSparkDeployMode())) { + if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) { try { List applications = applicationInfoService .getYarnAppReport(application.getAppName()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java index 3d00aa68c0..592d8ab2f7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java @@ -104,6 +104,9 @@ public void setLatest(Long appId, Long configId) { @Override public synchronized void update(SparkApplication appParam, Boolean latest) { // spark sql job + if (appParam.getConfig() == null) { + return; + } SparkApplicationConfig latestConfig = getLatest(appParam.getId()); if (appParam.isSparkSqlJob()) { updateForSparkSqlJob(appParam, latest, latestConfig); @@ -133,15 +136,15 @@ private void updateForNonSparkSqlJob(SparkApplication appParam, Boolean latest, } } else { SparkApplicationConfig config = getEffective(appParam.getId()); - if (config != null) { + if (config == null) { + this.create(appParam, latest); + } else { String decode = new String(Base64.getDecoder().decode(appParam.getConfig())); String encode = DeflaterUtils.zipString(decode.trim()); // create... if (!config.getContent().equals(encode)) { this.create(appParam, latest); } - } else { - this.create(appParam, latest); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java deleted file mode 100644 index 34ce31cd38..0000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.core.service.application.impl; - -import org.apache.streampark.console.base.domain.RestRequest; -import org.apache.streampark.console.base.mybatis.pager.MybatisPager; -import org.apache.streampark.console.core.entity.SparkApplicationLog; -import org.apache.streampark.console.core.mapper.SparkApplicationLogMapper; -import org.apache.streampark.console.core.service.application.SparkApplicationLogService; - -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; - -@Slf4j -@Service -@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class) -public class SparkApplicationLogServiceImpl extends ServiceImpl - implements - SparkApplicationLogService { - - @Override - public IPage getPage(SparkApplicationLog sparkApplicationLog, RestRequest request) { - request.setSortField("option_time"); - Page page = MybatisPager.getPage(request); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(SparkApplicationLog::getAppId, sparkApplicationLog.getAppId()); - return this.page(page, queryWrapper); - } - - @Override - public void removeByAppId(Long appId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(SparkApplicationLog::getAppId, appId); - this.remove(queryWrapper); - } -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java index 8f35545435..c138ce428a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java @@ -46,10 +46,10 @@ import org.apache.streampark.console.core.service.SparkSqlService; import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.console.core.service.application.AppBuildPipeService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.ApplicationService; import org.apache.streampark.console.core.service.application.SparkApplicationBackUpService; import org.apache.streampark.console.core.service.application.SparkApplicationConfigService; -import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum; @@ -107,7 +107,7 @@ public class SparkApplicationManageServiceImpl private SparkApplicationConfigService configService; @Autowired - private SparkApplicationLogService applicationLogService; + private ApplicationLogService applicationLogService; @Autowired private SparkSqlService sparkSqlService; @@ -153,9 +153,7 @@ public void persistMetrics(SparkApplication appParam) { @Override public boolean mapping(SparkApplication appParam) { - boolean mapping = this.baseMapper.mapping(appParam); - SparkApplication application = getById(appParam.getId()); - return mapping; + return this.baseMapper.mapping(appParam); } @Override @@ -237,7 +235,8 @@ record -> { && PipelineStatusEnum.success .getCode() .equals(record.getBuildStatus())) - .setAllowStop(record.isRunning()); + .setAllowStop(record.isRunning()) + .setAllowView(record.shouldTracking()); record.setAppControl(appControl); }) .collect(Collectors.toList()); @@ -396,7 +395,7 @@ public boolean update(SparkApplication appParam) { SparkApplication application = getById(appParam.getId()); /* If the original mode is remote, k8s-session, yarn-session, check cluster status */ - SparkDeployMode sparkDeployMode = application.getSparkDeployMode(); + SparkDeployMode sparkDeployMode = application.getDeployModeEnum(); boolean success = validateQueueIfNeeded(application, appParam); ApiAlertException.throwIfFalse( @@ -460,7 +459,7 @@ public boolean update(SparkApplication appParam) { application.setRestartSize(appParam.getRestartSize()); application.setTags(appParam.getTags()); - switch (appParam.getSparkDeployMode()) { + switch (appParam.getDeployModeEnum()) { case YARN_CLUSTER: case YARN_CLIENT: application.setHadoopUser(appParam.getHadoopUser()); @@ -655,7 +654,7 @@ public SparkApplication getApp(Long id) { */ @VisibleForTesting public boolean validateQueueIfNeeded(SparkApplication appParam) { - yarnQueueService.checkQueueLabel(appParam.getSparkDeployMode(), appParam.getYarnQueue()); + yarnQueueService.checkQueueLabel(appParam.getDeployModeEnum(), appParam.getYarnQueue()); if (!isYarnNotDefaultQueue(appParam)) { return true; } @@ -671,13 +670,13 @@ public boolean validateQueueIfNeeded(SparkApplication appParam) { */ @VisibleForTesting public boolean validateQueueIfNeeded(SparkApplication oldApp, SparkApplication newApp) { - yarnQueueService.checkQueueLabel(newApp.getSparkDeployMode(), newApp.getYarnQueue()); + yarnQueueService.checkQueueLabel(newApp.getDeployModeEnum(), newApp.getYarnQueue()); if (!isYarnNotDefaultQueue(newApp)) { return true; } oldApp.resolveYarnQueue(); - if (SparkDeployMode.isYarnMode(newApp.getSparkDeployMode()) + if (SparkDeployMode.isYarnMode(newApp.getDeployModeEnum()) && StringUtils.equals(oldApp.getYarnQueue(), newApp.getYarnQueue())) { return true; } @@ -693,14 +692,14 @@ public boolean validateQueueIfNeeded(SparkApplication oldApp, SparkApplication n * (empty or default), return true, false else. */ private boolean isYarnNotDefaultQueue(SparkApplication application) { - return SparkDeployMode.isYarnMode(application.getSparkDeployMode()) + return SparkDeployMode.isYarnMode(application.getDeployModeEnum()) && !yarnQueueService.isDefaultQueue(application.getYarnQueue()); } private boolean isYarnApplicationModeChange( SparkApplication application, SparkApplication appParam) { return !application.getDeployMode().equals(appParam.getDeployMode()) - && (SparkDeployMode.YARN_CLIENT == appParam.getSparkDeployMode() - || SparkDeployMode.YARN_CLUSTER == application.getSparkDeployMode()); + && (SparkDeployMode.YARN_CLIENT == appParam.getDeployModeEnum() + || SparkDeployMode.YARN_CLUSTER == application.getDeployModeEnum()); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java index dfff3440ac..fa5f56ab1c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java @@ -159,8 +159,8 @@ public void executeDistributedTask(DistributedTask distributedTask) throws Excep case REVOKE: sparkApplicationActionService.revoke(appParam.getId()); break; - case STOP: - sparkApplicationActionService.stop(appParam); + case CANCEL: + sparkApplicationActionService.cancel(appParam); break; case FORCED_STOP: sparkApplicationActionService.forcedStop(appParam.getId()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java index 4191d316de..3a93e42f73 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java @@ -32,6 +32,7 @@ import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.entity.FlinkSavepoint; import org.apache.streampark.console.core.enums.CheckPointTypeEnum; +import org.apache.streampark.console.core.enums.EngineTypeEnum; import org.apache.streampark.console.core.enums.OperationEnum; import org.apache.streampark.console.core.enums.OptionStateEnum; import org.apache.streampark.console.core.mapper.FlinkSavepointMapper; @@ -189,6 +190,7 @@ public void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolea @Nonnull private ApplicationLog getApplicationLog(FlinkApplication application) { ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setJobType(EngineTypeEnum.FLINK.getCode()); applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue()); applicationLog.setAppId(application.getId()); applicationLog.setTrackingUrl(application.getJobManagerUrl()); @@ -292,7 +294,7 @@ private String getFinalSavepointDir(@Nullable String savepointPath, FlinkApplica private Map tryGetRestProps(FlinkApplication application, FlinkCluster cluster) { Map properties = new HashMap<>(); - if (FlinkDeployMode.isRemoteMode(application.getFlinkDeployMode())) { + if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) { AssertUtils.notNull( cluster, String.format( @@ -311,7 +313,7 @@ private String getClusterId(FlinkApplication application, FlinkCluster cluster) ? cluster.getClusterId() : application.getClusterId(); } else if (FlinkDeployMode.isYarnMode(application.getDeployMode())) { - if (FlinkDeployMode.YARN_SESSION.equals(application.getFlinkDeployMode())) { + if (FlinkDeployMode.YARN_SESSION.equals(application.getDeployModeEnum())) { AssertUtils.notNull( cluster, String.format( @@ -493,7 +495,7 @@ private TriggerSavepointRequest renderTriggerSavepointRequest( return new TriggerSavepointRequest( application.getId(), flinkEnv.getFlinkVersion(), - application.getFlinkDeployMode(), + application.getDeployModeEnum(), properties, clusterId, application.getJobId(), diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java index 4b492fc087..59dbd3004d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java @@ -22,7 +22,7 @@ import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.entity.FlinkCluster; -import org.apache.streampark.console.core.entity.SparkApplicationLog; +import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.ProxyService; import org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper; @@ -95,9 +95,8 @@ public void handleError(@Nonnull ClientHttpResponse response) { @Override public ResponseEntity proxyFlink(HttpServletRequest request, FlinkApplication app) throws Exception { ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE); - String url = null; - switch (app.getFlinkDeployMode()) { + switch (app.getDeployModeEnum()) { case YARN_PER_JOB: case YARN_APPLICATION: case YARN_SESSION: @@ -115,7 +114,7 @@ public ResponseEntity proxyFlink(HttpServletRequest request, FlinkApplication break; default: throw new UnsupportedOperationException( - "unsupported deployMode ".concat(app.getFlinkDeployMode().getName())); + "unsupported deployMode ".concat(app.getDeployModeEnum().getName())); } if (url == null) { @@ -129,22 +128,25 @@ public ResponseEntity proxyFlink(HttpServletRequest request, FlinkApplication } @Override - public ResponseEntity proxyYarn(HttpServletRequest request, ApplicationLog log) throws Exception { - ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE); - String yarnId = log.getClusterId(); - if (StringUtils.isBlank(yarnId)) { - return builder.body("The yarn application id is null."); + public ResponseEntity proxySpark(HttpServletRequest request, SparkApplication app) throws Exception { + ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE); + switch (app.getDeployModeEnum()) { + case YARN_CLIENT: + case YARN_CLUSTER: + String yarnURL = YarnUtils.getRMWebAppProxyURL(); + String url = yarnURL + "/proxy/" + app.getClusterId(); + url += getRequestURL(request, "/proxy/spark/" + app.getId()); + return proxyYarnRequest(request, url); + default: + throw new UnsupportedOperationException( + "unsupported deployMode ".concat(app.getDeployModeEnum().getName())); } - String yarnURL = YarnUtils.getRMWebAppProxyURL(); - String url = yarnURL + "/proxy/" + yarnId + "/"; - url += getRequestURL(request, "/proxy/yarn/" + log.getId()); - return proxyYarnRequest(request, url); } @Override - public ResponseEntity proxyYarn(HttpServletRequest request, SparkApplicationLog log) throws Exception { + public ResponseEntity proxyYarn(HttpServletRequest request, ApplicationLog log) throws Exception { ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE); - String yarnId = log.getSparkAppId(); + String yarnId = log.getClusterId(); if (StringUtils.isBlank(yarnId)) { return builder.body("The yarn application id is null."); } @@ -167,7 +169,7 @@ public ResponseEntity proxyHistory(HttpServletRequest request, ApplicationLog } @Override - public ResponseEntity proxyCluster(HttpServletRequest request, Long clusterId) throws Exception { + public ResponseEntity proxyFlinkCluster(HttpServletRequest request, Long clusterId) throws Exception { ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE); FlinkCluster cluster = flinkClusterService.getById(clusterId); if (cluster == null) { @@ -178,7 +180,7 @@ public ResponseEntity proxyCluster(HttpServletRequest request, Long clusterId return builder.body("The cluster address is invalid."); } - url += getRequestURL(request, "/proxy/cluster/" + clusterId); + url += getRequestURL(request, "/proxy/flink_cluster/" + clusterId); switch (cluster.getFlinkDeployModeEnum()) { case YARN_PER_JOB: case YARN_APPLICATION: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java index 5b9675a5ab..a255037331 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java @@ -44,7 +44,7 @@ public static AlertTemplate createAlertTemplate(FlinkApplication application, Fl return AlertTemplate.builder() .duration(application.getStartTime(), application.getEndTime()) .jobName(application.getJobName()) - .link(application.getFlinkDeployMode(), application.getClusterId()) + .link(application.getDeployModeEnum(), application.getClusterId()) .startTime(application.getStartTime()) .endTime(application.getEndTime()) .restart(application.isNeedRestartOnFailed(), application.getRestartCount()) @@ -66,7 +66,7 @@ public static AlertTemplate createAlertTemplate(FlinkApplication application, Ch return AlertTemplate.builder() .duration(application.getStartTime(), application.getEndTime()) .jobName(application.getJobName()) - .link(application.getFlinkDeployMode(), application.getClusterId()) + .link(application.getDeployModeEnum(), application.getClusterId()) .startTime(application.getStartTime()) .type(AlertTypeEnum.DING_TALK.getCode()) .cpFailureRateInterval( @@ -120,7 +120,7 @@ public static AlertTemplate createAlertTemplate(SparkApplication application, Sp return AlertTemplate.builder() .duration(application.getStartTime(), application.getEndTime()) .jobName(application.getAppName()) - .link(application.getSparkDeployMode(), application.getAppId()) + .link(application.getDeployModeEnum(), application.getClusterId()) .startTime(application.getStartTime()) .endTime(application.getEndTime()) .restart(application.isNeedRestartOnFailed(), application.getRestartCount()) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 642e09b102..77444ef031 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -302,7 +302,7 @@ private void getStateFromFlink(FlinkApplication application) throws Exception { StopFromEnum stopFrom = getStopFrom(application); JobsOverview jobsOverview = httpJobsOverview(application); Optional optional; - FlinkDeployMode deployMode = application.getFlinkDeployMode(); + FlinkDeployMode deployMode = application.getDeployModeEnum(); if (FlinkDeployMode.YARN_APPLICATION.equals(deployMode) || FlinkDeployMode.YARN_PER_JOB.equals(deployMode)) { if (jobsOverview.getJobs() != null) { @@ -579,7 +579,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { // query the status from the yarn rest Api YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); if (yarnAppInfo == null) { - if (!FlinkDeployMode.REMOTE.equals(application.getFlinkDeployMode())) { + if (!FlinkDeployMode.REMOTE.equals(application.getDeployModeEnum())) { throw new RuntimeException( "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi failed "); } @@ -618,7 +618,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { } } } catch (Exception e) { - if (!FlinkDeployMode.REMOTE.equals(application.getFlinkDeployMode())) { + if (!FlinkDeployMode.REMOTE.equals(application.getDeployModeEnum())) { throw new RuntimeException( "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi error,", e); } @@ -730,8 +730,8 @@ private YarnAppInfo httpYarnAppInfo(FlinkApplication application) throws Excepti private Overview httpOverview(FlinkApplication application) throws IOException { String appId = application.getClusterId(); if (appId != null) { - if (application.getFlinkDeployMode().equals(FlinkDeployMode.YARN_APPLICATION) - || application.getFlinkDeployMode().equals(FlinkDeployMode.YARN_PER_JOB)) { + if (application.getDeployModeEnum().equals(FlinkDeployMode.YARN_APPLICATION) + || application.getDeployModeEnum().equals(FlinkDeployMode.YARN_PER_JOB)) { String reqURL; if (StringUtils.isEmpty(application.getJobManagerUrl())) { String format = "proxy/%s/overview"; @@ -748,7 +748,7 @@ private Overview httpOverview(FlinkApplication application) throws IOException { private JobsOverview httpJobsOverview(FlinkApplication application) throws Exception { final String flinkUrl = "jobs/overview"; - FlinkDeployMode deployMode = application.getFlinkDeployMode(); + FlinkDeployMode deployMode = application.getDeployModeEnum(); if (FlinkDeployMode.isYarnMode(deployMode)) { String reqURL; if (StringUtils.isEmpty(application.getJobManagerUrl())) { @@ -782,7 +782,7 @@ private JobsOverview httpJobsOverview(FlinkApplication application) throws Excep private CheckPoints httpCheckpoints(FlinkApplication application) throws Exception { final String flinkUrl = "jobs/%s/checkpoints"; - FlinkDeployMode deployMode = application.getFlinkDeployMode(); + FlinkDeployMode deployMode = application.getDeployModeEnum(); if (FlinkDeployMode.isYarnMode(deployMode)) { String reqURL; if (StringUtils.isEmpty(application.getJobManagerUrl())) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java index 696864687a..1ebad767ad 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java @@ -29,11 +29,9 @@ import org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary; import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; import org.apache.streampark.console.core.service.DistributedTaskService; -import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; -import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.streampark.console.core.utils.AlertTemplateUtils; @@ -80,12 +78,6 @@ public class SparkAppHttpWatcher { @Autowired private SparkApplicationInfoService applicationInfoService; - @Autowired - private SparkApplicationLogService applicationLogService; - - @Autowired - private SparkEnvService sparkEnvService; - @Autowired private DistributedTaskService distributedTaskService; @@ -227,7 +219,7 @@ private void getStateFromYarn(SparkApplication application) throws Exception { log.info( "[StreamPark][SparkAppHttpWatcher] getStateFromYarn, app {} was ended, appId is {}, state is {}", application.getId(), - application.getAppId(), + application.getClusterId(), sparkAppStateEnum); application.setEndTime(new Date()); if (appFinalStatus.equals(FinalApplicationStatus.FAILED)) { @@ -329,12 +321,12 @@ public static Collection getWatchingApps() { } private YarnAppInfo httpYarnAppInfo(SparkApplication application) throws Exception { - String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId()); + String reqURL = "ws/v1/cluster/apps/".concat(application.getClusterId()); return yarnRestRequest(reqURL, YarnAppInfo.class); } private Job[] httpJobsStatus(SparkApplication application) throws IOException { String format = "proxy/%s/api/v1/applications/%s/jobs"; - String reqURL = String.format(format, application.getAppId(), application.getAppId()); + String reqURL = String.format(format, application.getClusterId(), application.getClusterId()); return yarnRestRequest(reqURL, Job[].class); } diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index dd2de39867..623b9a1396 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -570,7 +570,7 @@ create table if not exists `t_spark_app` ( `jar_check_sum` bigint default null, `app_properties` text comment 'Arbitrary Spark configuration property in key=value format (e.g. spark.driver.cores=1)', `app_args` text comment 'Arguments passed to the main method of your main class', - `app_id` varchar(64) default null comment '(1)application_id on yarn(2)driver_pod_name on k8s', + `cluster_id` varchar(64) default null comment '(1)application_id on yarn(2)driver_pod_name on k8s', `yarn_queue` varchar(128) default null, `k8s_master_url` varchar(128) default null, `k8s_container_image` varchar(128) default null, @@ -610,24 +610,6 @@ create table if not exists `t_spark_app` ( primary key(`id`) ); - --- ---------------------------- --- Table structure for t_spark_log --- ---------------------------- -create table if not exists `t_spark_log` ( - `id` bigint generated by default as identity not null, - `app_id` bigint default null, - `spark_app_id` varchar(64) default null, - `track_url` varchar(255) default null, - `success` tinyint default null, - `exception` text , - `option_time` datetime default null, - `option_name` tinyint default null, - `user_id` bigint default null comment 'operator user id', - primary key(`id`) -); - - -- ---------------------------- -- Table structure for t_spark_effective -- ---------------------------- diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml index 037a2c1861..f1011cb9b6 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml @@ -53,8 +53,8 @@ and t.app_name like concat('%', '${app.appName}', '%') - - and t.app_id = #{app.appId} + + and t.cluster_id = #{app.clusterId} and t.state = #{app.state} @@ -92,8 +92,8 @@ update t_spark_app - - app_id=#{app.appId}, + + cluster_id=#{app.clusterId}, end_time=null, state=14, @@ -117,8 +117,8 @@ update t_spark_app - - app_id=#{app.appId}, + + cluster_id=#{app.clusterId}, tracking=#{app.tracking}, diff --git a/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts b/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts index 574d0036a2..41704111b6 100644 --- a/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts +++ b/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts @@ -53,6 +53,7 @@ export interface SparkApplication { appProperties?: string; appArgs?: string; appId?: string; + clusterId?: string; yarnQueue?: string; yarnQueueName?: string; yarnQueueLabel?: string; diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts index 6d8929c177..79fa2669f5 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts @@ -111,7 +111,7 @@ export default { lost: 'LOST', }, detail: { - detailTitle: 'Application Info', + detailTitle: 'Job Details', flinkWebUi: 'Flink Web UI', compareConfig: 'Compare Config', compareFlinkSql: 'Compare Flink SQL', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/app.ts index bd2fd4398d..ecc336b8f8 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/app.ts @@ -55,7 +55,7 @@ export default { tags: 'Job Tags', owner: 'Creator', sparkVersion: 'Spark Version', - duration: 'Runtime', + duration: 'Duration', modifiedTime: 'Modification Time', runStatus: 'Run Status', releaseBuild: 'Release Status', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts index b3ef87b7de..19742742d6 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts @@ -40,7 +40,7 @@ export default { }, form: { sparkName: 'Spark Name', - sparkHome: 'Installation Path', + sparkHome: 'Spark Home', description: 'Description', }, placeholder: { diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts index c5d3464ec9..4817a885cb 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts @@ -37,9 +37,9 @@ export default { spark: { application: '作业管理', sparkHome: 'Spark 版本', - createApplication: '创建应用', - updateApplication: '更新应用', - applicationDetail: '应用详情', + createApplication: '创建作业', + updateApplication: '更新作业', + applicationDetail: '作业详情', }, setting: { menu: '设置中心', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts index 7a7e406c4f..2c0a1381d7 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts @@ -25,22 +25,22 @@ export default { VCore: '占用的总 VCores', }, runState: { - added: 'Added', - new: 'New', - saving: 'Saving', - starting: 'Starting', - submitted: 'Submitted', - accept: 'Accepted', - running: 'Running', - finished: 'Finished', - failed: 'Job Failed', - lost: 'Job Lost', - mapping: 'Mapping', - other: 'Other', - revoked: 'Revoked', - stopping: 'Stopping', - success: 'Succeeded', - killed: 'Killed', + added: 'ADDED', + new: 'NEW', + saving: 'SAVING', + starting: '启动中', + submitted: '已提交', + accept: 'ACCEPTED', + running: '运行中', + finished: '已完成', + failed: '失败', + lost: '失联', + mapping: '映射中', + other: '其他', + revoked: '已回滚', + stopping: '停止中', + success: 'SUCCEEDED', + killed: 'KILLED', }, releaseState: { releasing: '发布中', @@ -49,7 +49,7 @@ export default { waiting: '待发布', pending: '待重启', }, - id: '应用ID', + id: '作业ID', appName: '作业名称', searchName: '根据作业名搜索', tags: '作业标签', @@ -64,9 +64,9 @@ export default { historyVersion: '历史版本', resource: '资源', resourcePlaceHolder: '从选择资源', - selectAppPlaceHolder: '选择应用', + selectAppPlaceHolder: '选择作业', dependency: '作业依赖', - appConf: '应用配置', + appConf: '作业配置', resolveOrder: '类加载顺序', parallelism: '并行度', restartSize: '(失败后)重启次数', @@ -95,9 +95,9 @@ export default { programJar: '程序Jar文件', dynamicProperties: '动态参数', hadoopConfigTitle: '系统 Hadoop', - dragUploadTitle: '单击或拖动 jar或py 到此区域以上传', + dragUploadTitle: '单击或拖动 jar 或 py 到此区域以上传', dragUploadTip: '支持单次上传。您可以在此处上传本地 jar 以支持当前作业', - dependencyError: '请先检查Spark版本.', + dependencyError: '请先检查 Spark 版本.', status: '运行状态', startTime: '启动时间', endTime: '结束时间', @@ -118,7 +118,7 @@ export default { lost: '失联', }, detail: { - detailTitle: '应用详情', + detailTitle: '作业详情', webUI: 'Spark Web UI', compareConfig: '比较配置', compareSparkSql: '比较 Spark SQL', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue index 68c032cb4e..99cea85a21 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue @@ -113,7 +113,6 @@ dataSource.forEach((x) => { x.expanded = [ { - appId: x.appId, jmMemory: x.jmMemory, tmMemory: x.tmMemory, totalTM: x.totalTM, diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue index 7df156c9df..f33a12e7a4 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue @@ -393,14 +393,14 @@ Cancel -