Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] flink job on yarn exists check improvement #3424

Merged
merged 1 commit into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ public RestResponse revoke(Application app) {
return RestResponse.success();
}

@PermissionAction(id = "#app.id", type = PermissionTypeEnum.APP)
@PostMapping(value = "check_start")
@RequiresPermissions("app:start")
public RestResponse checkStart(Application app) {
AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app);
return RestResponse.success(stateEnum.get());
}

@Operation(
summary = "Start application",
tags = {ApiDocConstant.FLINK_APP_OP_TAG})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;

import org.apache.hadoop.yarn.api.records.ApplicationReport;

import com.baomidou.mybatisplus.extension.service.IService;

import java.io.IOException;
Expand Down Expand Up @@ -220,4 +222,18 @@ public interface ApplicationInfoService extends IService<Application> {
* @return A list of strings representing the names of the uploaded jars.
*/
List<String> listHistoryUploadJars();

/**
* check application before start
*
* @param appParam
* @return org.apache.streampark.console.core.enums.AppExistsStateEnum
*/
AppExistsStateEnum checkStart(Application appParam);

/**
* @param appName
* @return running,submitted, accepted job list in YARN
*/
List<ApplicationReport> getYarnAppReport(String appName);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
Expand Down Expand Up @@ -51,9 +53,13 @@
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -65,9 +71,11 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.jar.Manifest;
Expand Down Expand Up @@ -317,6 +325,45 @@ public List<String> listHistoryUploadJars() {
.collect(Collectors.toList());
}

@Override
public AppExistsStateEnum checkStart(Application appParam) {
Application application = getById(appParam.getId());
if (application == null) {
return AppExistsStateEnum.INVALID;
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
boolean exists = !getYarnAppReport(application.getJobName()).isEmpty();
return exists ? AppExistsStateEnum.IN_YARN : AppExistsStateEnum.NO;
}
// todo on k8s check...
return AppExistsStateEnum.NO;
}

@Override
public List<ApplicationReport> getYarnAppReport(String appName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
Set<String> types =
Sets.newHashSet(
ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
EnumSet<YarnApplicationState> states =
EnumSet.of(
YarnApplicationState.NEW,
YarnApplicationState.NEW_SAVING,
YarnApplicationState.SUBMITTED,
YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING);
Set<String> yarnTag = Sets.newHashSet("streampark");
List<ApplicationReport> applications = yarnClient.getApplications(types, states, yarnTag);
return applications.stream()
.filter(report -> report.getName().equals(appName))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(
"getYarnAppReport failed. Ensure that yarn is running properly. ", e);
}
}

@Override
public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
Application application = getById(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum APP_API {
DELETE = '/flink/app/delete',
DELETE_BAK = '/flink/app/deletebak',
CREATE = '/flink/app/create',
CHECK_START = '/flink/app/check_start',
START = '/flink/app/start',
CLEAN = '/flink/app/clean',
BACKUPS = '/flink/app/backups',
Expand Down Expand Up @@ -228,3 +229,7 @@ export function fetchCancel(data: CancelParam): Promise<boolean> {
export function fetchName(data: { config: string }) {
return defHttp.post({ url: APP_API.NAME, data });
}

export function fetchCheckStart(data): Promise<AxiosResponse<number>> {
return defHttp.post({ url: APP_API.CHECK_START, data }, { isReturnNativeResponse: true });
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@
height: 18px;
background-color: #fff;
border-radius: 50%;
transition:
transform 0.5s,
background-color 0.5s;
transition: transform 0.5s, background-color 0.5s;
will-change: transform;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@
background-color: @component-background;
border: 1px solid rgb(0 0 0 / 8%);
border-radius: 0.25rem;
box-shadow:
0 2px 2px 0 rgb(0 0 0 / 14%),
0 3px 1px -2px rgb(0 0 0 / 10%),
box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
0 1px 5px 0 rgb(0 0 0 / 6%);
background-clip: padding-box;
user-select: none;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
});

const getBindValue = computed(
() => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
() => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
);

const getSchema = computed((): FormSchema[] => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@
return slot
? getSlot(slots, slot, unref(getValues))
: render
? render(unref(getValues))
: renderComponent();
? render(unref(getValues))
: renderComponent();
};

const showSuffix = !!suffix;
Expand Down Expand Up @@ -382,8 +382,8 @@
return colSlot
? getSlot(slots, colSlot, values)
: renderColContent
? renderColContent(values)
: renderItem();
? renderColContent(values)
: renderItem();
};

return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
realHeightRef.value = props.height
? props.height
: realHeight > maxHeight
? maxHeight
: realHeight;
? maxHeight
: realHeight;
}
emit('height-change', unref(realHeightRef));
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
line-height: 44px;
background-color: @component-background;
border-top: 1px solid @border-color-base;
box-shadow:
0 -6px 16px -8px rgb(0 0 0 / 8%),
0 -9px 28px 0 rgb(0 0 0 / 5%),
box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 5%),
0 -12px 48px 16px rgb(0 0 0 / 3%);
transition: width 0.2s;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
props: {
column: {
type: Object as PropType<BasicColumn>,
default: () => ({}) as BasicColumn,
default: () => ({} as BasicColumn),
},
},
setup(props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,11 @@ export enum RestoreModeEnum {
CLAIM = 2,
LEGACY = 3,
}

export enum AppExistsEnum {
NO = 0,
IN_DB = 1,
IN_YARN = 2,
IN_KUBERNETES = 3,
INVALID = 4,
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,18 @@ export function useMenuSetting() {
return siderHidden
? 0
: collapsedShowTitle
? SIDE_BAR_SHOW_TIT_MINI_WIDTH
: SIDE_BAR_MINI_WIDTH;
? SIDE_BAR_SHOW_TIT_MINI_WIDTH
: SIDE_BAR_MINI_WIDTH;
});

const getCalcContentWidth = computed(() => {
const width =
unref(getIsTopMenu) || !unref(getShowMenu) || (unref(getSplit) && unref(getMenuHidden))
? 0
: unref(getIsMixSidebar)
? (unref(getCollapsed) ? SIDE_BAR_MINI_WIDTH : SIDE_BAR_SHOW_TIT_MINI_WIDTH) +
(unref(getMixSideFixed) && unref(mixSideHasChildren) ? unref(getRealWidth) : 0)
: unref(getRealWidth);
? (unref(getCollapsed) ? SIDE_BAR_MINI_WIDTH : SIDE_BAR_SHOW_TIT_MINI_WIDTH) +
(unref(getMixSideFixed) && unref(mixSideHasChildren) ? unref(getRealWidth) : 0)
: unref(getRealWidth);

return `calc(100% - ${unref(width)}px)`;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ export function useLockPage() {
}
clear();

timeId = setTimeout(
() => {
lockPage();
},
lockTime * 60 * 1000,
);
timeId = setTimeout(() => {
lockPage();
}, lockTime * 60 * 1000);
}

function lockPage(): void {
Expand Down
48 changes: 24 additions & 24 deletions streampark-console/streampark-console-webapp/src/utils/props.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ export type BuildPropOption<T, D extends BuildPropType<T, V, C>, R, V, C> = {
default?: R extends true
? never
: D extends Record<string, unknown> | Array<any>
? () => D
: (() => D) | D;
? () => D
: (() => D) | D;
validator?: ((val: any) => val is C) | ((val: any) => boolean);
};

type _BuildPropType<T, V, C> =
| (T extends PropWrapper<unknown>
? T[typeof wrapperKey]
: [V] extends [never]
? ResolvePropTypeWithReadonly<T>
: never)
? ResolvePropTypeWithReadonly<T>
: never)
| V
| C;
export type BuildPropType<T, V, C> = _BuildPropType<
Expand All @@ -53,8 +53,8 @@ type _BuildPropDefault<T, D> = [T] extends [
]
? D
: D extends () => T
? ReturnType<D>
: D;
? ReturnType<D>
: D;

export type BuildPropDefault<T, D, R> = R extends true
? { readonly default?: undefined }
Expand Down Expand Up @@ -146,12 +146,12 @@ export const buildProps = <
[K in keyof O]: O[K] extends BuildPropReturn<any, any, any, any, any>
? O[K]
: [O[K]] extends NativePropType
? O[K]
: O[K] extends BuildPropOption<infer T, infer D, infer R, infer V, infer C>
? D extends BuildPropType<T, V, C>
? BuildPropOption<T, D, R, V, C>
: never
: never;
? O[K]
: O[K] extends BuildPropOption<infer T, infer D, infer R, infer V, infer C>
? D extends BuildPropType<T, V, C>
? BuildPropOption<T, D, R, V, C>
: never
: never;
},
>(
props: O,
Expand All @@ -162,20 +162,20 @@ export const buildProps = <
[K in keyof O]: O[K] extends { [propKey]: boolean }
? O[K]
: [O[K]] extends NativePropType
? O[K]
: O[K] extends BuildPropOption<
infer T,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
infer _D,
infer R,
infer V,
infer C
>
? BuildPropReturn<T, O[K]['default'], R, V, C>
: never;
? O[K]
: O[K] extends BuildPropOption<
infer T,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
infer _D,
infer R,
infer V,
infer C
>
? BuildPropReturn<T, O[K]['default'], R, V, C>
: never;
};

export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as PropWrapper<T>;
export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as PropWrapper<T>);

export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as Array<keyof T>;
export const mutable = <T extends readonly any[] | Record<string, unknown>>(val: T) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ export function getColumns(): BasicColumn[] {
text === ErrorTypeEnum.VUE
? 'green'
: text === ErrorTypeEnum.RESOURCE
? 'cyan'
: text === ErrorTypeEnum.PROMISE
? 'blue'
: ErrorTypeEnum.AJAX
? 'red'
: 'purple';
? 'cyan'
: text === ErrorTypeEnum.PROMISE
? 'blue'
: ErrorTypeEnum.AJAX
? 'red'
: 'purple';
return <Tag color={color}>{() => text}</Tag>;
},
},
Expand Down
Loading
Loading