Skip to content

Commit

Permalink
[Improve] flink job on yarn exists check improvement (#3424)
Browse files Browse the repository at this point in the history
Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Dec 23, 2023
1 parent 697d8dc commit 2781c94
Show file tree
Hide file tree
Showing 14 changed files with 254 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ public RestResponse revoke(Application app) {
return RestResponse.success();
}

@PermissionAction(id = "#app.id", type = PermissionTypeEnum.APP)
@PostMapping(value = "check_start")
@RequiresPermissions("app:start")
public RestResponse checkStart(Application app) {
AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app);
return RestResponse.success(stateEnum.get());
}

@Operation(
summary = "Start application",
tags = {ApiDocConstant.FLINK_APP_OP_TAG})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;

import org.apache.hadoop.yarn.api.records.ApplicationReport;

import com.baomidou.mybatisplus.extension.service.IService;

import java.io.IOException;
Expand Down Expand Up @@ -220,4 +222,18 @@ public interface ApplicationInfoService extends IService<Application> {
* @return A list of strings representing the names of the uploaded jars.
*/
List<String> listHistoryUploadJars();

/**
* check application before start
*
* @param appParam
* @return org.apache.streampark.console.core.enums.AppExistsStateEnum
*/
AppExistsStateEnum checkStart(Application appParam);

/**
* @param appName
* @return running,submitted, accepted job list in YARN
*/
List<ApplicationReport> getYarnAppReport(String appName);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
Expand Down Expand Up @@ -51,9 +53,13 @@
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -65,9 +71,11 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.jar.Manifest;
Expand Down Expand Up @@ -317,6 +325,45 @@ public List<String> listHistoryUploadJars() {
.collect(Collectors.toList());
}

@Override
public AppExistsStateEnum checkStart(Application appParam) {
Application application = getById(appParam.getId());
if (application == null) {
return AppExistsStateEnum.INVALID;
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
boolean exists = !getYarnAppReport(application.getJobName()).isEmpty();
return exists ? AppExistsStateEnum.IN_YARN : AppExistsStateEnum.NO;
}
// todo on k8s check...
return AppExistsStateEnum.NO;
}

@Override
public List<ApplicationReport> getYarnAppReport(String appName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
Set<String> types =
Sets.newHashSet(
ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
EnumSet<YarnApplicationState> states =
EnumSet.of(
YarnApplicationState.NEW,
YarnApplicationState.NEW_SAVING,
YarnApplicationState.SUBMITTED,
YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING);
Set<String> yarnTag = Sets.newHashSet("streampark");
List<ApplicationReport> applications = yarnClient.getApplications(types, states, yarnTag);
return applications.stream()
.filter(report -> report.getName().equals(appName))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(
"getYarnAppReport failed. Ensure that yarn is running properly. ", e);
}
}

@Override
public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
Application application = getById(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum APP_API {
DELETE = '/flink/app/delete',
DELETE_BAK = '/flink/app/deletebak',
CREATE = '/flink/app/create',
CHECK_START = '/flink/app/check_start',
START = '/flink/app/start',
CLEAN = '/flink/app/clean',
BACKUPS = '/flink/app/backups',
Expand Down Expand Up @@ -228,3 +229,7 @@ export function fetchCancel(data: CancelParam): Promise<boolean> {
export function fetchName(data: { config: string }) {
return defHttp.post({ url: APP_API.NAME, data });
}

export function fetchCheckStart(data): Promise<AxiosResponse<number>> {
return defHttp.post({ url: APP_API.CHECK_START, data }, { isReturnNativeResponse: true });
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@
height: 18px;
background-color: #fff;
border-radius: 50%;
transition:
transform 0.5s,
background-color 0.5s;
transition: transform 0.5s, background-color 0.5s;
will-change: transform;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@
background-color: @component-background;
border: 1px solid rgb(0 0 0 / 8%);
border-radius: 0.25rem;
box-shadow:
0 2px 2px 0 rgb(0 0 0 / 14%),
0 3px 1px -2px rgb(0 0 0 / 10%),
box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
0 1px 5px 0 rgb(0 0 0 / 6%);
background-clip: padding-box;
user-select: none;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
});
const getBindValue = computed(
() => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
() => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
);
const getSchema = computed((): FormSchema[] => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
line-height: 44px;
background-color: @component-background;
border-top: 1px solid @border-color-base;
box-shadow:
0 -6px 16px -8px rgb(0 0 0 / 8%),
0 -9px 28px 0 rgb(0 0 0 / 5%),
box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 5%),
0 -12px 48px 16px rgb(0 0 0 / 3%);
transition: width 0.2s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
props: {
column: {
type: Object as PropType<BasicColumn>,
default: () => ({}) as BasicColumn,
default: () => ({} as BasicColumn),
},
},
setup(props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,11 @@ export enum RestoreModeEnum {
CLAIM = 2,
LEGACY = 3,
}

export enum AppExistsEnum {
NO = 0,
IN_DB = 1,
IN_YARN = 2,
IN_KUBERNETES = 3,
INVALID = 4,
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ export function useLockPage() {
}
clear();

timeId = setTimeout(
() => {
lockPage();
},
lockTime * 60 * 1000,
);
timeId = setTimeout(() => {
lockPage();
}, lockTime * 60 * 1000);
}

function lockPage(): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ export const buildProps = <
: never;
};

export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as PropWrapper<T>;
export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as PropWrapper<T>);

export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as Array<keyof T>;
export const mutable = <T extends readonly any[] | Record<string, unknown>>(val: T) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
import { BasicModal, useModalInner } from '/@/components/Modal';
import { useMessage } from '/@/hooks/web/useMessage';
import { useRouter } from 'vue-router';
import { fetchStart } from '/@/api/flink/app';
import { RestoreModeEnum } from '/@/enums/flinkEnum';
import { fetchCheckStart, fetchForcedStop, fetchStart } from '/@/api/flink/app';

import { AppExistsEnum, RestoreModeEnum } from '/@/enums/flinkEnum';
import { fetchFlinkEnv } from '/@/api/flink/flinkEnv';
import { renderFlinkAppRestoreMode } from '/@/views/flink/app/hooks/useFlinkRender';

Expand Down Expand Up @@ -121,8 +122,21 @@
baseColProps: { span: 24 },
});

/* submit */
async function handleSubmit() {
// when then app is building, show forced starting modal
const resp = await fetchCheckStart({
id: receiveData.application.id,
});
if (resp.data.data === AppExistsEnum.IN_YARN) {
await fetchForcedStop({
id: receiveData.application.id,
});
}
await handleDoSubmit();
}

/* submit */
async function handleDoSubmit() {
try {
const formValue = (await validate()) as Recordable;
const savePointed = formValue.startSavePointed;
Expand Down

0 comments on commit 2781c94

Please sign in to comment.