From 2781c94ef1530c595ef97fdf16b0bb5e47f9a08d Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 23 Dec 2023 20:29:33 +0800 Subject: [PATCH] [Improve] flink job on yarn exists check improvement (#3424) Co-authored-by: benjobs --- .../controller/ApplicationController.java | 8 + .../application/ApplicationInfoService.java | 16 + .../impl/ApplicationActionServiceImpl.java | 286 +++++++++--------- .../impl/ApplicationInfoServiceImpl.java | 47 +++ .../src/api/flink/app.ts | 5 + .../Application/src/AppDarkModeToggle.vue | 4 +- .../ContextMenu/src/ContextMenu.vue | 4 +- .../src/components/Form/src/BasicForm.vue | 2 +- .../src/components/Page/src/PageFooter.vue | 4 +- .../Table/src/components/HeaderCell.vue | 2 +- .../src/enums/flinkEnum.ts | 8 + .../src/hooks/web/useLockPage.ts | 9 +- .../src/utils/props.ts | 2 +- .../AppView/StartApplicationModal.vue | 20 +- 14 files changed, 254 insertions(+), 163 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java index 6059143611..412eabef43 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java @@ -171,6 +171,14 @@ public RestResponse revoke(Application app) { return RestResponse.success(); } + @PermissionAction(id = "#app.id", type = PermissionTypeEnum.APP) + @PostMapping(value = "check_start") + @RequiresPermissions("app:start") + public RestResponse checkStart(Application app) { + AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app); + return RestResponse.success(stateEnum.get()); + } + @Operation( summary = "Start application", tags = {ApiDocConstant.FLINK_APP_OP_TAG}) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java index c86ce3cc1c..8d1e0b112d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java @@ -21,6 +21,8 @@ import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.enums.AppExistsStateEnum; +import org.apache.hadoop.yarn.api.records.ApplicationReport; + import com.baomidou.mybatisplus.extension.service.IService; import java.io.IOException; @@ -220,4 +222,18 @@ public interface ApplicationInfoService extends IService { * @return A list of strings representing the names of the uploaded jars. */ List listHistoryUploadJars(); + + /** + * check application before start + * + * @param appParam + * @return org.apache.streampark.console.core.enums.AppExistsStateEnum + */ + AppExistsStateEnum checkStart(Application appParam); + + /** + * @param appName + * @return running,submitted, accepted job list in YARN + */ + List getYarnAppReport(String appName); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 8f22ca5f47..85188dc43b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -19,7 +19,6 @@ import org.apache.streampark.common.Constant; import org.apache.streampark.common.conf.ConfigKeys; -import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.FlinkDevelopmentMode; @@ -27,7 +26,6 @@ import org.apache.streampark.common.enums.FlinkRestoreMode; import org.apache.streampark.common.enums.ResolveOrder; import org.apache.streampark.common.fs.FsOperator; -import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.HadoopUtils; @@ -65,6 +63,7 @@ import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.application.ApplicationActionService; +import org.apache.streampark.console.core.service.application.ApplicationInfoService; import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub; import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; @@ -144,6 +143,8 @@ public class ApplicationActionServiceImpl extends ServiceImpl { - applicationLog.setSuccess(true); - if (cancelResponse != null && cancelResponse.savePointDir() != null) { - String savePointDir = cancelResponse.savePointDir(); - log.info("savePoint path: {}", savePointDir); - SavePoint savePoint = new SavePoint(); - savePoint.setPath(savePointDir); - savePoint.setAppId(application.getId()); - savePoint.setLatest(true); - savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get()); - savePoint.setCreateTime(new Date()); - savePoint.setTriggerTime(triggerTime); - savePointService.save(savePoint); + cancelFuture.whenComplete( + (cancelResponse, throwable) -> { + cancelFutureMap.remove(application.getId()); + + if (throwable != null) { + String exception = ExceptionUtils.stringifyException(throwable); + applicationLog.setException(exception); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); + + if (throwable instanceof CancellationException) { + doStopped(application); + } else { + log.error("stop flink job failed.", throwable); + application.setOptionState(OptionStateEnum.NONE.getValue()); + application.setState(FlinkAppStateEnum.FAILED.getValue()); + updateById(application); + + if (appParam.getSavePointed()) { + savePointService.expire(application.getId()); } + // re-tracking flink job on kubernetes and logging exception if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(application)); - } - }, - e -> { - if (e.getCause() instanceof CancellationException) { - updateToStopped(application); + TrackId id = toTrackId(application); + k8SFlinkTrackMonitor.unWatching(id); + k8SFlinkTrackMonitor.doWatching(id); } else { - log.error("stop flink job fail.", e); - application.setOptionState(OptionStateEnum.NONE.getValue()); - application.setState(FlinkAppStateEnum.FAILED.getValue()); - updateById(application); - - if (appParam.getSavePointed()) { - savePointService.expire(application.getId()); - } + FlinkAppHttpWatcher.unWatching(application.getId()); + } + } + return; + } - // re-tracking flink job on kubernetes and logging exception - if (isKubernetesApp(application)) { - TrackId id = toTrackId(application); - k8SFlinkTrackMonitor.unWatching(id); - k8SFlinkTrackMonitor.doWatching(id); - } else { - FlinkAppHttpWatcher.unWatching(application.getId()); - } + applicationLog.setSuccess(true); + // save log... + applicationLogService.save(applicationLog); + + if (cancelResponse != null && cancelResponse.savePointDir() != null) { + String savePointDir = cancelResponse.savePointDir(); + log.info("savePoint path: {}", savePointDir); + SavePoint savePoint = new SavePoint(); + savePoint.setPath(savePointDir); + savePoint.setAppId(application.getId()); + savePoint.setLatest(true); + savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get()); + savePoint.setCreateTime(new Date()); + savePoint.setTriggerTime(triggerTime); + savePointService.save(savePoint); + } - String exception = ExceptionUtils.stringifyException(e); - applicationLog.setException(exception); - applicationLog.setSuccess(false); - } - }) - .whenComplete( - (t, e) -> { - cancelFutureMap.remove(application.getId()); - applicationLogService.save(applicationLog); - }); + if (isKubernetesApp(application)) { + k8SFlinkTrackMonitor.unWatching(toTrackId(application)); + } + }); } @Override @@ -392,7 +393,7 @@ public void start(Application appParam, boolean auto) throws Exception { if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) { ApiAlertException.throwIfTrue( - checkAppRepeatInYarn(application.getJobName()), + !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(), "[StreamPark] The same task name is already running in the yarn queue"); } @@ -486,98 +487,87 @@ public void start(Application appParam, boolean auto) throws Exception { startFutureMap.put(application.getId(), future); - CompletableFutureUtils.runTimeout( - future, - 2L, - TimeUnit.MINUTES, - submitResponse -> { - if (submitResponse.flinkConfig() != null) { - String jmMemory = - submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); - if (jmMemory != null) { - application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); - } - String tmMemory = - submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY()); - if (tmMemory != null) { - application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); - } - } - application.setAppId(submitResponse.clusterId()); - if (StringUtils.isNoneEmpty(submitResponse.jobId())) { - application.setJobId(submitResponse.jobId()); - } - - if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) { - application.setJobManagerUrl(submitResponse.jobManagerUrl()); - applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl()); - } - applicationLog.setYarnAppId(submitResponse.clusterId()); - application.setStartTime(new Date()); - application.setEndTime(null); - if (isKubernetesApp(application)) { - application.setRelease(ReleaseStateEnum.DONE.get()); - } - updateById(application); - - // if start completed, will be added task to tracking queue - if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.doWatching(toTrackId(application)); + future.whenComplete( + (response, throwable) -> { + // 1) remove Future + startFutureMap.remove(application.getId()); + + // 2) exception + if (throwable != null) { + String exception = ExceptionUtils.stringifyException(throwable); + applicationLog.setException(exception); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); + if (throwable instanceof CancellationException) { + doStopped(application); + } else { + Application app = getById(appParam.getId()); + app.setState(FlinkAppStateEnum.FAILED.getValue()); + app.setOptionState(OptionStateEnum.NONE.getValue()); + updateById(app); + if (isKubernetesApp(app)) { + k8SFlinkTrackMonitor.unWatching(toTrackId(app)); } else { - FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING); - FlinkAppHttpWatcher.doWatching(application); + FlinkAppHttpWatcher.unWatching(appParam.getId()); } + } + return; + } - applicationLog.setSuccess(true); - // set savepoint to expire - savePointService.expire(application.getId()); - }, - e -> { - if (e.getCause() instanceof CancellationException) { - updateToStopped(application); - } else { - String exception = ExceptionUtils.stringifyException(e); - applicationLog.setException(exception); - applicationLog.setSuccess(false); - Application app = getById(appParam.getId()); - app.setState(FlinkAppStateEnum.FAILED.getValue()); - app.setOptionState(OptionStateEnum.NONE.getValue()); - updateById(app); - if (isKubernetesApp(app)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(app)); - } else { - FlinkAppHttpWatcher.unWatching(appParam.getId()); - } - } - }) - .whenComplete( - (t, e) -> { - if (!K8sFlinkConfig.isV2Enabled() - && FlinkExecutionMode.isKubernetesApplicationMode( - application.getExecutionMode())) { - String domainName = settingService.getIngressModeDefault(); - if (StringUtils.isNotBlank(domainName)) { - try { - IngressController.configureIngress( - domainName, application.getClusterId(), application.getK8sNamespace()); - } catch (KubernetesClientException kubernetesClientException) { - log.info( - "Failed to create ingress, stack info:{}", - kubernetesClientException.getMessage()); - applicationLog.setException(e.getMessage()); - applicationLog.setSuccess(false); - applicationLogService.save(applicationLog); - application.setState(FlinkAppStateEnum.FAILED.getValue()); - application.setOptionState(OptionStateEnum.NONE.getValue()); - updateById(application); - return; - } + // 3) success + applicationLog.setSuccess(true); + if (response.flinkConfig() != null) { + String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); + if (jmMemory != null) { + application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); + } + String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY()); + if (tmMemory != null) { + application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); + } + } + application.setAppId(response.clusterId()); + if (StringUtils.isNoneEmpty(response.jobId())) { + application.setJobId(response.jobId()); + } + + if (StringUtils.isNoneEmpty(response.jobManagerUrl())) { + application.setJobManagerUrl(response.jobManagerUrl()); + applicationLog.setJobManagerUrl(response.jobManagerUrl()); + } + applicationLog.setYarnAppId(response.clusterId()); + application.setStartTime(new Date()); + application.setEndTime(null); + + // if start completed, will be added task to tracking queue + if (isKubernetesApp(application)) { + application.setRelease(ReleaseStateEnum.DONE.get()); + k8SFlinkTrackMonitor.doWatching(toTrackId(application)); + if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) { + String domainName = settingService.getIngressModeDefault(); + if (StringUtils.isNotBlank(domainName)) { + try { + IngressController.configureIngress( + domainName, application.getClusterId(), application.getK8sNamespace()); + } catch (KubernetesClientException e) { + log.info("Failed to create ingress, stack info:{}", e.getMessage()); + applicationLog.setException(e.getMessage()); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); + application.setState(FlinkAppStateEnum.FAILED.getValue()); + application.setOptionState(OptionStateEnum.NONE.getValue()); } } - - applicationLogService.save(applicationLog); - startFutureMap.remove(application.getId()); - }); + } + } else { + FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING); + FlinkAppHttpWatcher.doWatching(application); + } + // update app + updateById(application); + // save log + applicationLogService.save(applicationLog); + }); } /** @@ -781,8 +771,8 @@ private Map getProperties(Application application) { return properties; } - private void updateToStopped(Application app) { - Application application = getById(app); + private void doStopped(Application appParam) { + Application application = getById(appParam); application.setOptionState(OptionStateEnum.NONE.getValue()); application.setState(FlinkAppStateEnum.CANCELED.getValue()); application.setOptionTime(new Date()); @@ -796,6 +786,18 @@ private void updateToStopped(Application app) { } else { FlinkAppHttpWatcher.unWatching(application.getId()); } + // kill application + if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) { + try { + List applications = + applicationInfoService.getYarnAppReport(application.getJobName()); + if (!applications.isEmpty()) { + YarnClient yarnClient = HadoopUtils.yarnClient(); + yarnClient.killApplication(applications.get(0).getApplicationId()); + } + } catch (Exception ignored) { + } + } } private String getSavePointed(Application appParam) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java index 11732cbc1f..95883a07e9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java @@ -20,9 +20,11 @@ import org.apache.streampark.common.Constant; import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.fs.LfsOperator; import org.apache.streampark.common.util.ExceptionUtils; +import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.exception.ApiAlertException; @@ -51,9 +53,13 @@ import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -65,9 +71,11 @@ import java.util.Arrays; import java.util.Base64; import java.util.Comparator; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.jar.Manifest; @@ -317,6 +325,45 @@ public List listHistoryUploadJars() { .collect(Collectors.toList()); } + @Override + public AppExistsStateEnum checkStart(Application appParam) { + Application application = getById(appParam.getId()); + if (application == null) { + return AppExistsStateEnum.INVALID; + } + if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) { + boolean exists = !getYarnAppReport(application.getJobName()).isEmpty(); + return exists ? AppExistsStateEnum.IN_YARN : AppExistsStateEnum.NO; + } + // todo on k8s check... + return AppExistsStateEnum.NO; + } + + @Override + public List getYarnAppReport(String appName) { + try { + YarnClient yarnClient = HadoopUtils.yarnClient(); + Set types = + Sets.newHashSet( + ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName()); + EnumSet states = + EnumSet.of( + YarnApplicationState.NEW, + YarnApplicationState.NEW_SAVING, + YarnApplicationState.SUBMITTED, + YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING); + Set yarnTag = Sets.newHashSet("streampark"); + List applications = yarnClient.getApplications(types, states, yarnTag); + return applications.stream() + .filter(report -> report.getName().equals(appName)) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeException( + "getYarnAppReport failed. Ensure that yarn is running properly. ", e); + } + } + @Override public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception { Application application = getById(id); diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.ts b/streampark-console/streampark-console-webapp/src/api/flink/app.ts index 13110db387..f32dbda465 100644 --- a/streampark-console/streampark-console-webapp/src/api/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/api/flink/app.ts @@ -40,6 +40,7 @@ enum APP_API { DELETE = '/flink/app/delete', DELETE_BAK = '/flink/app/deletebak', CREATE = '/flink/app/create', + CHECK_START = '/flink/app/check_start', START = '/flink/app/start', CLEAN = '/flink/app/clean', BACKUPS = '/flink/app/backups', @@ -228,3 +229,7 @@ export function fetchCancel(data: CancelParam): Promise { export function fetchName(data: { config: string }) { return defHttp.post({ url: APP_API.NAME, data }); } + +export function fetchCheckStart(data): Promise> { + return defHttp.post({ url: APP_API.CHECK_START, data }, { isReturnNativeResponse: true }); +} diff --git a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue index d4e0ce1646..19ba3b1514 100644 --- a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue +++ b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue @@ -63,9 +63,7 @@ height: 18px; background-color: #fff; border-radius: 50%; - transition: - transform 0.5s, - background-color 0.5s; + transition: transform 0.5s, background-color 0.5s; will-change: transform; } diff --git a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue index 78cac5c5b6..e08c25f36b 100644 --- a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue +++ b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue @@ -179,9 +179,7 @@ background-color: @component-background; border: 1px solid rgb(0 0 0 / 8%); border-radius: 0.25rem; - box-shadow: - 0 2px 2px 0 rgb(0 0 0 / 14%), - 0 3px 1px -2px rgb(0 0 0 / 10%), + box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%), 0 1px 5px 0 rgb(0 0 0 / 6%); background-clip: padding-box; user-select: none; diff --git a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue index e5a9dacf62..1cd7e3809b 100644 --- a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue +++ b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue @@ -113,7 +113,7 @@ }); const getBindValue = computed( - () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable, + () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable), ); const getSchema = computed((): FormSchema[] => { diff --git a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue index 8fdbc8f41a..e89a6ce979 100644 --- a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue +++ b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue @@ -39,9 +39,7 @@ line-height: 44px; background-color: @component-background; border-top: 1px solid @border-color-base; - box-shadow: - 0 -6px 16px -8px rgb(0 0 0 / 8%), - 0 -9px 28px 0 rgb(0 0 0 / 5%), + box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 5%), 0 -12px 48px 16px rgb(0 0 0 / 3%); transition: width 0.2s; diff --git a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue index 36ab854c59..35c0802696 100644 --- a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue +++ b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue @@ -22,7 +22,7 @@ props: { column: { type: Object as PropType, - default: () => ({}) as BasicColumn, + default: () => ({} as BasicColumn), }, }, setup(props) { diff --git a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts index ef463403b0..b0b2f83ed1 100644 --- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts +++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts @@ -226,3 +226,11 @@ export enum RestoreModeEnum { CLAIM = 2, LEGACY = 3, } + +export enum AppExistsEnum { + NO = 0, + IN_DB = 1, + IN_YARN = 2, + IN_KUBERNETES = 3, + INVALID = 4, +} diff --git a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts index 9a66074210..c543be9542 100644 --- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts +++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts @@ -32,12 +32,9 @@ export function useLockPage() { } clear(); - timeId = setTimeout( - () => { - lockPage(); - }, - lockTime * 60 * 1000, - ); + timeId = setTimeout(() => { + lockPage(); + }, lockTime * 60 * 1000); } function lockPage(): void { diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts b/streampark-console/streampark-console-webapp/src/utils/props.ts index 368f490f21..4a15ec465b 100644 --- a/streampark-console/streampark-console-webapp/src/utils/props.ts +++ b/streampark-console/streampark-console-webapp/src/utils/props.ts @@ -175,7 +175,7 @@ export const buildProps = < : never; }; -export const definePropType = (val: any) => ({ [wrapperKey]: val }) as PropWrapper; +export const definePropType = (val: any) => ({ [wrapperKey]: val } as PropWrapper); export const keyOf = (arr: T) => Object.keys(arr) as Array; export const mutable = >(val: T) => diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue index 359f20acf1..98a0f01884 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue @@ -31,8 +31,9 @@ import { BasicModal, useModalInner } from '/@/components/Modal'; import { useMessage } from '/@/hooks/web/useMessage'; import { useRouter } from 'vue-router'; - import { fetchStart } from '/@/api/flink/app'; - import { RestoreModeEnum } from '/@/enums/flinkEnum'; + import { fetchCheckStart, fetchForcedStop, fetchStart } from '/@/api/flink/app'; + + import { AppExistsEnum, RestoreModeEnum } from '/@/enums/flinkEnum'; import { fetchFlinkEnv } from '/@/api/flink/flinkEnv'; import { renderFlinkAppRestoreMode } from '/@/views/flink/app/hooks/useFlinkRender'; @@ -121,8 +122,21 @@ baseColProps: { span: 24 }, }); - /* submit */ async function handleSubmit() { + // when then app is building, show forced starting modal + const resp = await fetchCheckStart({ + id: receiveData.application.id, + }); + if (resp.data.data === AppExistsEnum.IN_YARN) { + await fetchForcedStop({ + id: receiveData.application.id, + }); + } + await handleDoSubmit(); + } + + /* submit */ + async function handleDoSubmit() { try { const formValue = (await validate()) as Recordable; const savePointed = formValue.startSavePointed;