Skip to content

Commit

Permalink
Improve streampark-console module base on [3.6 Control/Condition Stat…
Browse files Browse the repository at this point in the history
…ements]
  • Loading branch information
lihui committed Nov 10, 2023
1 parent 2489b67 commit 1a98611
Show file tree
Hide file tree
Showing 19 changed files with 134 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,21 @@ public class UploadFileTypeInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
if (request instanceof MultipartHttpServletRequest) {
MultipartHttpServletRequest multipartRequest = (MultipartHttpServletRequest) request;
Map<String, MultipartFile> files = multipartRequest.getFileMap();
for (String file : files.keySet()) {
MultipartFile multipartFile = multipartRequest.getFile(file);
ApiAlertException.throwIfNull(
multipartFile, "File to upload can't be null. Upload file failed.");
boolean isJarOrPyFile =
FileUtils.isJarFileType(multipartFile.getInputStream())
|| isPythonFileType(multipartFile.getContentType(), multipartFile.getInputStream());
ApiAlertException.throwIfFalse(
isJarOrPyFile,
"Illegal file type, Only support standard jar files. Upload file failed.");
}
if (!(request instanceof MultipartHttpServletRequest)) {
return true;
}

MultipartHttpServletRequest multipartRequest = (MultipartHttpServletRequest) request;
Map<String, MultipartFile> files = multipartRequest.getFileMap();
for (String file : files.keySet()) {
MultipartFile multipartFile = multipartRequest.getFile(file);
ApiAlertException.throwIfNull(
multipartFile, "File to upload can't be null. Upload file failed.");
boolean isJarOrPyFile =
FileUtils.isJarFileType(multipartFile.getInputStream())
|| isPythonFileType(multipartFile.getContentType(), multipartFile.getInputStream());
ApiAlertException.throwIfFalse(
isJarOrPyFile, "Illegal file type, Only support standard jar files. Upload file failed.");
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,10 @@ public Object intercept(final Invocation invocation) throws Throwable {
RowBounds rowBounds = (RowBounds) args[2];
ResultHandler<?> resultHandler = (ResultHandler<?>) args[3];
Executor executor = (Executor) invocation.getTarget();
CacheKey cacheKey;
BoundSql boundSql;
if (args.length == 4) {
boundSql = ms.getBoundSql(parameter);
cacheKey = executor.createCacheKey(ms, parameter, rowBounds, boundSql);
} else {
cacheKey = (CacheKey) args[4];
boundSql = (BoundSql) args[5];
}
boolean fourLen = args.length == 4;
BoundSql boundSql = fourLen ? ms.getBoundSql(parameter) : (BoundSql) args[5];
CacheKey cacheKey =
fourLen ? executor.createCacheKey(ms, parameter, rowBounds, boundSql) : (CacheKey) args[4];
return executor.query(ms, parameter, rowBounds, resultHandler, cacheKey, boundSql);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ private ObjectUtils() {}

public static boolean trimEquals(Object o1, Object o2) {
boolean equals = Objects.deepEquals(o1, o2);
if (!equals) {
if (o1 instanceof String && o2 instanceof String) {
return o1.toString().trim().equals(o2.toString().trim());
}
if (!equals && o1 instanceof String && o2 instanceof String) {
return o1.toString().trim().equals(o2.toString().trim());
}
return equals;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,34 @@ public RestResponse permissionAction(ProceedingJoinPoint joinPoint) throws Throw
ApiAlertException.throwIfNull(currentUser, "Permission denied, please login first.");

boolean isAdmin = currentUser.getUserType() == UserTypeEnum.ADMIN;
if (isAdmin) {
return (RestResponse) joinPoint.proceed();
}

if (!isAdmin) {
PermissionTypeEnum permissionTypeEnum = permissionAction.type();
Long paramId = getParamId(joinPoint, methodSignature, permissionAction.id());

switch (permissionTypeEnum) {
case USER:
ApiAlertException.throwIfTrue(
!currentUser.getUserId().equals(paramId),
"Permission denied, only user himself can access this permission");
break;
case TEAM:
ApiAlertException.throwIfTrue(
memberService.getByTeamIdUserName(paramId, currentUser.getUsername()) == null,
"Permission denied, only user belongs to this team can access this permission");
break;
case APP:
Application app = applicationManageService.getById(paramId);
ApiAlertException.throwIfTrue(app == null, "Invalid operation, application is null");
ApiAlertException.throwIfTrue(
memberService.getByTeamIdUserName(app.getTeamId(), currentUser.getUsername()) == null,
"Permission denied, only user belongs to this team can access this permission");
break;
default:
throw new IllegalArgumentException(
String.format("Permission type %s is not supported.", permissionTypeEnum));
}
PermissionTypeEnum permissionTypeEnum = permissionAction.type();
Long paramId = getParamId(joinPoint, methodSignature, permissionAction.id());

switch (permissionTypeEnum) {
case USER:
ApiAlertException.throwIfTrue(
!currentUser.getUserId().equals(paramId),
"Permission denied, only user himself can access this permission");
break;
case TEAM:
ApiAlertException.throwIfTrue(
memberService.getByTeamIdUserName(paramId, currentUser.getUsername()) == null,
"Permission denied, only user belongs to this team can access this permission");
break;
case APP:
Application app = applicationManageService.getById(paramId);
ApiAlertException.throwIfTrue(app == null, "Invalid operation, application is null");
ApiAlertException.throwIfTrue(
memberService.getByTeamIdUserName(app.getTeamId(), currentUser.getUsername()) == null,
"Permission denied, only user belongs to this team can access this permission");
break;
default:
throw new IllegalArgumentException(
String.format("Permission type %s is not supported.", permissionTypeEnum));
}

return (RestResponse) joinPoint.proceed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,8 @@ public RestResponse checkSavepointPath(Application app) throws Exception {
String error = applicationInfoService.checkSavepointPath(app);
if (error == null) {
return RestResponse.success(true);
} else {
return RestResponse.success(false).message(error);
}
return RestResponse.success(false).message(error);
}

@Operation(summary = "Get application on k8s deploy logs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,15 @@ public RestResponse completeHostAlias(String hosts, String podTemplate) {
private Map<String, String> covertHostsParamToMap(String hosts) {
if (StringUtils.isBlank(hosts)) {
return new HashMap<>(0);
} else {
return Arrays.stream(hosts.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.map(e -> e.split(":"))
.filter(
arr ->
arr.length == 2
&& StringUtils.isNotBlank(arr[0])
&& StringUtils.isNotBlank(arr[1]))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}
return Arrays.stream(hosts.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.map(e -> e.split(":"))
.filter(
arr ->
arr.length == 2 && StringUtils.isNotBlank(arr[0]) && StringUtils.isNotBlank(arr[1]))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}

@Operation(summary = "Extract host-alias from pod template")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ public RestResponse verify(String sql, Long versionId, Long teamId) {
.put(END, flinkSqlValidationResult.errorLine() + 1);
}
return response;
} else {
return RestResponse.success(true);
}
return RestResponse.success(true);
}

@Operation(summary = "List the application sql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ public boolean cpFailedTrigger() {
}

public boolean eqFlinkJob(Application other) {
if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
if (this.isFlinkSqlJob()
&& other.isFlinkSqlJob()
&& this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ public static class Bridge {
public static FlinkAppStateEnum fromK8sFlinkJobState(Enumeration.Value flinkJobState) {
if (FlinkJobStateEnum.K8S_INITIALIZING() == flinkJobState) {
return INITIALIZING;
} else {
return of(flinkJobState.toString());
}
return of(flinkJobState.toString());
}

/** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public CheckPointStatusEnum getCheckPointStatus() {
public CheckPointTypeEnum getCheckPointType() {
if ("CHECKPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.CHECKPOINT;
} else if ("SAVEPOINT".equals(this.checkpointType)) {
}
if ("SAVEPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.SAVEPOINT;
}
return CheckPointTypeEnum.SYNC_SAVEPOINT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public AppExistsStateEnum checkExists(Application appParam) {
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsStateEnum.IN_KUBERNETES;
}
Expand All @@ -419,7 +419,7 @@ else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsStateEnum.IN_KUBERNETES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,15 @@ public IPage<Application> page(Application appParam, RestRequest request) {
}
Page<Application> page = new MybatisPager<Application>().getDefaultPage(request);

if (ArrayUtils.isNotEmpty(appParam.getStateArray())) {
if (Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
Integer[] newArray =
ArrayUtils.insert(
appParam.getStateArray().length,
appParam.getStateArray(),
FlinkAppStateEnum.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
if (ArrayUtils.isNotEmpty(appParam.getStateArray())
&& Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
Integer[] newArray =
ArrayUtils.insert(
appParam.getStateArray().length,
appParam.getStateArray(),
FlinkAppStateEnum.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
this.baseMapper.selectPage(page, appParam);
List<Application> records = page.getRecords();
Expand Down Expand Up @@ -492,21 +491,8 @@ public boolean update(Application appParam) {
}

// 2) k8s podTemplate changed.
if (application.getBuild()
&& FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
if (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage())) {
application.setBuild(true);
}
if (application.getBuild() && isK8sPodTemplateChanged(application, appParam)) {
application.setBuild(true);
}

// 3) flink version changed
Expand All @@ -516,13 +502,8 @@ public boolean update(Application appParam) {
}

// 4) yarn application mode change
if (!application.getBuild()) {
if (!application.getExecutionMode().equals(appParam.getExecutionMode())) {
if (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()) {
application.setBuild(true);
}
}
if (!application.getBuild() && isYarnApplicationModeChange(application, appParam)) {
application.setBuild(true);
}

appParam.setJobType(application.getJobType());
Expand Down Expand Up @@ -841,4 +822,25 @@ private boolean isYarnNotDefaultQueue(Application application) {
return FlinkExecutionMode.isYarnPerJobOrAppMode(application.getFlinkExecutionMode())
&& !yarnQueueService.isDefaultQueue(application.getYarnQueue());
}

private boolean isK8sPodTemplateChanged(Application application, Application appParam) {
return FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage()));
}

private boolean isYarnApplicationModeChange(Application application, Application appParam) {
return !application.getExecutionMode().equals(appParam.getExecutionMode())
&& (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,8 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
if (K8sFlinkConfig.isV2Enabled()) {
return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
} else {
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
}
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " + app.getFlinkExecutionMode());
Expand Down Expand Up @@ -602,9 +601,8 @@ public boolean saveEntity(AppBuildPipeline pipe) {
AppBuildPipeline old = getById(pipe.getAppId());
if (old == null) {
return save(pipe);
} else {
return updateById(pipe);
}
return updateById(pipe);
}

private void checkOrElseUploadJar(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ public FlinkEnv getDefault() {
@Override
public FlinkEnv getByIdOrDefault(Long id) {
FlinkEnv flinkEnv = getById(id);
if (flinkEnv == null) {
return getDefault();
}
return flinkEnv;
return flinkEnv == null ? getDefault() : flinkEnv;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ public RestResponse create(Project project) {

if (status) {
return response.message("Add project successfully").data(true);
} else {
return response.message("Add project failed").data(false);
}
return response.message("Add project failed").data(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,17 @@ private Map<String, Object> tryGetRestProps(Application application, FlinkCluste
private String getClusterId(Application application, FlinkCluster cluster) {
if (FlinkExecutionMode.isKubernetesMode(application.getExecutionMode())) {
return application.getClusterId();
} else if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) {
Utils.notNull(
cluster,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
return cluster.getClusterId();
} else {
return application.getAppId();
}
return application.getAppId();
}
return null;
}
Expand Down
Loading

0 comments on commit 1a98611

Please sign in to comment.