Skip to content

Commit

Permalink
[Improve] Improve streampark-console module base on 3.6 Control/Condi…
Browse files Browse the repository at this point in the history
…tion Statements (#3328)
  • Loading branch information
eagleli123 authored Nov 25, 2023
1 parent 6e98f4c commit 39dfd17
Show file tree
Hide file tree
Showing 16 changed files with 81 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,10 @@ public Object intercept(final Invocation invocation) throws Throwable {
RowBounds rowBounds = (RowBounds) args[2];
ResultHandler<?> resultHandler = (ResultHandler<?>) args[3];
Executor executor = (Executor) invocation.getTarget();
CacheKey cacheKey;
BoundSql boundSql;
if (args.length == 4) {
boundSql = ms.getBoundSql(parameter);
cacheKey = executor.createCacheKey(ms, parameter, rowBounds, boundSql);
} else {
cacheKey = (CacheKey) args[4];
boundSql = (BoundSql) args[5];
}
boolean fourLen = args.length == 4;
BoundSql boundSql = fourLen ? ms.getBoundSql(parameter) : (BoundSql) args[5];
CacheKey cacheKey =
fourLen ? executor.createCacheKey(ms, parameter, rowBounds, boundSql) : (CacheKey) args[4];
return executor.query(ms, parameter, rowBounds, resultHandler, cacheKey, boundSql);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ private ObjectUtils() {}

public static boolean trimEquals(Object o1, Object o2) {
boolean equals = Objects.deepEquals(o1, o2);
if (!equals) {
if (o1 instanceof String && o2 instanceof String) {
return o1.toString().trim().equals(o2.toString().trim());
}
if (!equals && o1 instanceof String && o2 instanceof String) {
return o1.toString().trim().equals(o2.toString().trim());
}
return equals;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,8 @@ public RestResponse checkSavepointPath(Application app) throws Exception {
String error = applicationInfoService.checkSavepointPath(app);
if (error == null) {
return RestResponse.success(true);
} else {
return RestResponse.success(false).message(error);
}
return RestResponse.success(false).message(error);
}

@Operation(summary = "Get application on k8s deploy logs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,15 @@ public RestResponse completeHostAlias(String hosts, String podTemplate) {
private Map<String, String> covertHostsParamToMap(String hosts) {
if (StringUtils.isBlank(hosts)) {
return new HashMap<>(0);
} else {
return Arrays.stream(hosts.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.map(e -> e.split(":"))
.filter(
arr ->
arr.length == 2
&& StringUtils.isNotBlank(arr[0])
&& StringUtils.isNotBlank(arr[1]))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}
return Arrays.stream(hosts.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.map(e -> e.split(":"))
.filter(
arr ->
arr.length == 2 && StringUtils.isNotBlank(arr[0]) && StringUtils.isNotBlank(arr[1]))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}

@Operation(summary = "Extract host-alias from pod template")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ public RestResponse verify(String sql, Long versionId, Long teamId) {
.put(END, flinkSqlValidationResult.errorLine() + 1);
}
return response;
} else {
return RestResponse.success(true);
}
return RestResponse.success(true);
}

@Operation(summary = "List the application sql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ public boolean cpFailedTrigger() {
}

public boolean eqFlinkJob(Application other) {
if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
if (this.isFlinkSqlJob()
&& other.isFlinkSqlJob()
&& this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ public static class Bridge {
public static FlinkAppStateEnum fromK8sFlinkJobState(Enumeration.Value flinkJobState) {
if (FlinkJobStateEnum.K8S_INITIALIZING() == flinkJobState) {
return INITIALIZING;
} else {
return of(flinkJobState.toString());
}
return of(flinkJobState.toString());
}

/** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public CheckPointStatusEnum getCheckPointStatus() {
public CheckPointTypeEnum getCheckPointType() {
if ("CHECKPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.CHECKPOINT;
} else if ("SAVEPOINT".equals(this.checkpointType)) {
}
if ("SAVEPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.SAVEPOINT;
}
return CheckPointTypeEnum.SYNC_SAVEPOINT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public AppExistsStateEnum checkExists(Application appParam) {
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsStateEnum.IN_KUBERNETES;
}
Expand All @@ -419,7 +419,7 @@ else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsStateEnum.IN_KUBERNETES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,15 @@ public IPage<Application> page(Application appParam, RestRequest request) {
}
Page<Application> page = new MybatisPager<Application>().getDefaultPage(request);

if (ArrayUtils.isNotEmpty(appParam.getStateArray())) {
if (Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
Integer[] newArray =
ArrayUtils.insert(
appParam.getStateArray().length,
appParam.getStateArray(),
FlinkAppStateEnum.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
if (ArrayUtils.isNotEmpty(appParam.getStateArray())
&& Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
Integer[] newArray =
ArrayUtils.insert(
appParam.getStateArray().length,
appParam.getStateArray(),
FlinkAppStateEnum.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
this.baseMapper.selectPage(page, appParam);
List<Application> records = page.getRecords();
Expand Down Expand Up @@ -492,21 +491,8 @@ public boolean update(Application appParam) {
}

// 2) k8s podTemplate changed.
if (application.getBuild()
&& FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
if (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage())) {
application.setBuild(true);
}
if (application.getBuild() && isK8sPodTemplateChanged(application, appParam)) {
application.setBuild(true);
}

// 3) flink version changed
Expand All @@ -516,13 +502,8 @@ public boolean update(Application appParam) {
}

// 4) yarn application mode change
if (!application.getBuild()) {
if (!application.getExecutionMode().equals(appParam.getExecutionMode())) {
if (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()) {
application.setBuild(true);
}
}
if (!application.getBuild() && isYarnApplicationModeChange(application, appParam)) {
application.setBuild(true);
}

appParam.setJobType(application.getJobType());
Expand Down Expand Up @@ -841,4 +822,25 @@ private boolean isYarnNotDefaultQueue(Application application) {
return FlinkExecutionMode.isYarnPerJobOrAppMode(application.getFlinkExecutionMode())
&& !yarnQueueService.isDefaultQueue(application.getYarnQueue());
}

private boolean isK8sPodTemplateChanged(Application application, Application appParam) {
return FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage()));
}

private boolean isYarnApplicationModeChange(Application application, Application appParam) {
return !application.getExecutionMode().equals(appParam.getExecutionMode())
&& (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,8 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
if (K8sFlinkConfig.isV2Enabled()) {
return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
} else {
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
}
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " + app.getFlinkExecutionMode());
Expand Down Expand Up @@ -602,9 +601,8 @@ public boolean saveEntity(AppBuildPipeline pipe) {
AppBuildPipeline old = getById(pipe.getAppId());
if (old == null) {
return save(pipe);
} else {
return updateById(pipe);
}
return updateById(pipe);
}

private void checkOrElseUploadJar(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ public FlinkEnv getDefault() {
@Override
public FlinkEnv getByIdOrDefault(Long id) {
FlinkEnv flinkEnv = getById(id);
if (flinkEnv == null) {
return getDefault();
}
return flinkEnv;
return flinkEnv == null ? getDefault() : flinkEnv;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ public RestResponse create(Project project) {

if (status) {
return response.message("Add project successfully").data(true);
} else {
return response.message("Add project failed").data(false);
}
return response.message("Add project failed").data(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,17 @@ private Map<String, Object> tryGetRestProps(Application application, FlinkCluste
private String getClusterId(Application application, FlinkCluster cluster) {
if (FlinkExecutionMode.isKubernetesMode(application.getExecutionMode())) {
return application.getClusterId();
} else if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) {
Utils.notNull(
cluster,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
return cluster.getClusterId();
} else {
return application.getAppId();
}
return application.getAppId();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ public int compareTo(WordWithFrequency other) {
// This step is very critical. If the same name is not judged and the count is different,
// then the set collection will default to the same element, and it will be overwritten.
return this.word.compareTo(other.word) * -1;
} else {
return num * -1;
}
return num * -1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,9 @@ private void handleJobOverview(Application application, JobsOverview.Job jobOver
if (application.getStartTime() == null || startTime != application.getStartTime().getTime()) {
application.setStartTime(new Date(startTime));
}
if (endTime != -1) {
if (application.getEndTime() == null || endTime != application.getEndTime().getTime()) {
application.setEndTime(new Date(endTime));
}
if (endTime != -1
&& (application.getEndTime() == null || endTime != application.getEndTime().getTime())) {
application.setEndTime(new Date(endTime));
}

application.setJobId(jobOverview.getId());
Expand Down Expand Up @@ -676,19 +675,18 @@ private YarnAppInfo httpYarnAppInfo(Application application) throws Exception {

private Overview httpOverview(Application application) throws IOException {
String appId = application.getAppId();
if (appId != null) {
if (FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_PER_JOB == application.getFlinkExecutionMode()) {
String reqURL;
if (StringUtils.isBlank(application.getJobManagerUrl())) {
String format = "proxy/%s/overview";
reqURL = String.format(format, appId);
} else {
String format = "%s/overview";
reqURL = String.format(format, application.getJobManagerUrl());
}
return yarnRestRequest(reqURL, Overview.class);
if (appId != null
&& (FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_PER_JOB == application.getFlinkExecutionMode())) {
String reqURL;
if (StringUtils.isBlank(application.getJobManagerUrl())) {
String format = "proxy/%s/overview";
reqURL = String.format(format, appId);
} else {
String format = "%s/overview";
reqURL = String.format(format, application.getJobManagerUrl());
}
return yarnRestRequest(reqURL, Overview.class);
}
return null;
}
Expand Down

0 comments on commit 39dfd17

Please sign in to comment.