diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java index 80a8ce6788..4933868ac4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java @@ -62,15 +62,10 @@ public Object intercept(final Invocation invocation) throws Throwable { RowBounds rowBounds = (RowBounds) args[2]; ResultHandler resultHandler = (ResultHandler) args[3]; Executor executor = (Executor) invocation.getTarget(); - CacheKey cacheKey; - BoundSql boundSql; - if (args.length == 4) { - boundSql = ms.getBoundSql(parameter); - cacheKey = executor.createCacheKey(ms, parameter, rowBounds, boundSql); - } else { - cacheKey = (CacheKey) args[4]; - boundSql = (BoundSql) args[5]; - } + boolean fourLen = args.length == 4; + BoundSql boundSql = fourLen ? ms.getBoundSql(parameter) : (BoundSql) args[5]; + CacheKey cacheKey = + fourLen ? executor.createCacheKey(ms, parameter, rowBounds, boundSql) : (CacheKey) args[4]; return executor.query(ms, parameter, rowBounds, resultHandler, cacheKey, boundSql); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java index dc8a7b2a35..90b3f97239 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java @@ -25,10 +25,8 @@ private ObjectUtils() {} public static boolean trimEquals(Object o1, Object o2) { boolean equals = Objects.deepEquals(o1, o2); - if (!equals) { - if (o1 instanceof String && o2 instanceof String) { - return o1.toString().trim().equals(o2.toString().trim()); - } + if (!equals && o1 instanceof String && o2 instanceof String) { + return o1.toString().trim().equals(o2.toString().trim()); } return equals; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java index 788992e900..f3c6f1fe37 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java @@ -405,9 +405,8 @@ public RestResponse checkSavepointPath(Application app) throws Exception { String error = applicationInfoService.checkSavepointPath(app); if (error == null) { return RestResponse.success(true); - } else { - return RestResponse.success(false).message(error); } + return RestResponse.success(false).message(error); } @Operation(summary = "Get application on k8s deploy logs") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java index abf2385a0c..9b7805ac89 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java @@ -74,18 +74,15 @@ public RestResponse completeHostAlias(String hosts, String podTemplate) { private Map covertHostsParamToMap(String hosts) { if (StringUtils.isBlank(hosts)) { return new HashMap<>(0); - } else { - return Arrays.stream(hosts.split(",")) - .filter(StringUtils::isNotBlank) - .map(String::trim) - .map(e -> e.split(":")) - .filter( - arr -> - arr.length == 2 - && StringUtils.isNotBlank(arr[0]) - && StringUtils.isNotBlank(arr[1])) - .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1])); } + return Arrays.stream(hosts.split(",")) + .filter(StringUtils::isNotBlank) + .map(String::trim) + .map(e -> e.split(":")) + .filter( + arr -> + arr.length == 2 && StringUtils.isNotBlank(arr[0]) && StringUtils.isNotBlank(arr[1])) + .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1])); } @Operation(summary = "Extract host-alias from pod template") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java index 2de639c510..73e3db3325 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java @@ -82,9 +82,8 @@ public RestResponse verify(String sql, Long versionId, Long teamId) { .put(END, flinkSqlValidationResult.errorLine() + 1); } return response; - } else { - return RestResponse.success(true); } + return RestResponse.success(true); } @Operation(summary = "List the application sql") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 09deab73da..ba8bc259b1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -363,10 +363,10 @@ public boolean cpFailedTrigger() { } public boolean eqFlinkJob(Application other) { - if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) { - if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) { - return this.getDependencyObject().equals(other.getDependencyObject()); - } + if (this.isFlinkSqlJob() + && other.isFlinkSqlJob() + && this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) { + return this.getDependencyObject().equals(other.getDependencyObject()); } return false; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java index 77963c3211..151484f75d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java @@ -155,9 +155,8 @@ public static class Bridge { public static FlinkAppStateEnum fromK8sFlinkJobState(Enumeration.Value flinkJobState) { if (FlinkJobStateEnum.K8S_INITIALIZING() == flinkJobState) { return INITIALIZING; - } else { - return of(flinkJobState.toString()); } + return of(flinkJobState.toString()); } /** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */ diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java index 812d84ea31..56f75186b0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java @@ -80,7 +80,8 @@ public CheckPointStatusEnum getCheckPointStatus() { public CheckPointTypeEnum getCheckPointType() { if ("CHECKPOINT".equals(this.checkpointType)) { return CheckPointTypeEnum.CHECKPOINT; - } else if ("SAVEPOINT".equals(this.checkpointType)) { + } + if ("SAVEPOINT".equals(this.checkpointType)) { return CheckPointTypeEnum.SAVEPOINT; } return CheckPointTypeEnum.SYNC_SAVEPOINT; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java index b8f8b8a4ba..176da5fd75 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java @@ -403,7 +403,7 @@ public AppExistsStateEnum checkExists(Application appParam) { return AppExistsStateEnum.IN_YARN; } // check whether clusterId, namespace, jobId on kubernetes - else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode()) + if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode()) && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) { return AppExistsStateEnum.IN_KUBERNETES; } @@ -419,7 +419,7 @@ else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode()) return AppExistsStateEnum.IN_YARN; } // check whether clusterId, namespace, jobId on kubernetes - else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode()) + if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode()) && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) { return AppExistsStateEnum.IN_KUBERNETES; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java index 8c5bce4378..247ff807a9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java @@ -239,16 +239,15 @@ public IPage page(Application appParam, RestRequest request) { } Page page = new MybatisPager().getDefaultPage(request); - if (ArrayUtils.isNotEmpty(appParam.getStateArray())) { - if (Arrays.stream(appParam.getStateArray()) - .anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) { - Integer[] newArray = - ArrayUtils.insert( - appParam.getStateArray().length, - appParam.getStateArray(), - FlinkAppStateEnum.POS_TERMINATED.getValue()); - appParam.setStateArray(newArray); - } + if (ArrayUtils.isNotEmpty(appParam.getStateArray()) + && Arrays.stream(appParam.getStateArray()) + .anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) { + Integer[] newArray = + ArrayUtils.insert( + appParam.getStateArray().length, + appParam.getStateArray(), + FlinkAppStateEnum.POS_TERMINATED.getValue()); + appParam.setStateArray(newArray); } this.baseMapper.selectPage(page, appParam); List records = page.getRecords(); @@ -492,21 +491,8 @@ public boolean update(Application appParam) { } // 2) k8s podTemplate changed. - if (application.getBuild() - && FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())) { - if (ObjectUtils.trimNoEquals( - application.getK8sRestExposedType(), appParam.getK8sRestExposedType()) - || ObjectUtils.trimNoEquals( - application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate()) - || ObjectUtils.trimNoEquals( - application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate()) - || ObjectUtils.trimNoEquals( - application.getK8sPodTemplates(), appParam.getK8sPodTemplates()) - || ObjectUtils.trimNoEquals( - application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration()) - || ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage())) { - application.setBuild(true); - } + if (application.getBuild() && isK8sPodTemplateChanged(application, appParam)) { + application.setBuild(true); } // 3) flink version changed @@ -516,13 +502,8 @@ public boolean update(Application appParam) { } // 4) yarn application mode change - if (!application.getBuild()) { - if (!application.getExecutionMode().equals(appParam.getExecutionMode())) { - if (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode() - || FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()) { - application.setBuild(true); - } - } + if (!application.getBuild() && isYarnApplicationModeChange(application, appParam)) { + application.setBuild(true); } appParam.setJobType(application.getJobType()); @@ -841,4 +822,25 @@ private boolean isYarnNotDefaultQueue(Application application) { return FlinkExecutionMode.isYarnPerJobOrAppMode(application.getFlinkExecutionMode()) && !yarnQueueService.isDefaultQueue(application.getYarnQueue()); } + + private boolean isK8sPodTemplateChanged(Application application, Application appParam) { + return FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode()) + && (ObjectUtils.trimNoEquals( + application.getK8sRestExposedType(), appParam.getK8sRestExposedType()) + || ObjectUtils.trimNoEquals( + application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate()) + || ObjectUtils.trimNoEquals( + application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate()) + || ObjectUtils.trimNoEquals( + application.getK8sPodTemplates(), appParam.getK8sPodTemplates()) + || ObjectUtils.trimNoEquals( + application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration()) + || ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage())); + } + + private boolean isYarnApplicationModeChange(Application application, Application appParam) { + return !application.getExecutionMode().equals(appParam.getExecutionMode()) + && (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode() + || FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index c04829104f..db827a626e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -517,9 +517,8 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest); if (K8sFlinkConfig.isV2Enabled()) { return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest); - } else { - return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest); } + return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest); default: throw new UnsupportedOperationException( "Unsupported Building Application for ExecutionMode: " + app.getFlinkExecutionMode()); @@ -602,9 +601,8 @@ public boolean saveEntity(AppBuildPipeline pipe) { AppBuildPipeline old = getById(pipe.getAppId()); if (old == null) { return save(pipe); - } else { - return updateById(pipe); } + return updateById(pipe); } private void checkOrElseUploadJar( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java index 1883c36a5c..988917d56e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java @@ -133,10 +133,7 @@ public FlinkEnv getDefault() { @Override public FlinkEnv getByIdOrDefault(Long id) { FlinkEnv flinkEnv = getById(id); - if (flinkEnv == null) { - return getDefault(); - } - return flinkEnv; + return flinkEnv == null ? getDefault() : flinkEnv; } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index 5360ff78af..7449032d06 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -109,9 +109,8 @@ public RestResponse create(Project project) { if (status) { return response.message("Add project successfully").data(true); - } else { - return response.message("Add project failed").data(false); } + return response.message("Add project failed").data(false); } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 608ab02805..60afb07c15 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -307,7 +307,8 @@ private Map tryGetRestProps(Application application, FlinkCluste private String getClusterId(Application application, FlinkCluster cluster) { if (FlinkExecutionMode.isKubernetesMode(application.getExecutionMode())) { return application.getClusterId(); - } else if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) { + } + if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) { if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) { Utils.notNull( cluster, @@ -315,9 +316,8 @@ private String getClusterId(Application application, FlinkCluster cluster) { "The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId())); return cluster.getClusterId(); - } else { - return application.getAppId(); } + return application.getAppId(); } return null; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java index 5ce506db88..8405c55d1c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java @@ -82,9 +82,8 @@ public int compareTo(WordWithFrequency other) { // This step is very critical. If the same name is not judged and the count is different, // then the set collection will default to the same element, and it will be overwritten. return this.word.compareTo(other.word) * -1; - } else { - return num * -1; } + return num * -1; } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 52e9de29ab..033631e6ce 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -436,10 +436,9 @@ private void handleJobOverview(Application application, JobsOverview.Job jobOver if (application.getStartTime() == null || startTime != application.getStartTime().getTime()) { application.setStartTime(new Date(startTime)); } - if (endTime != -1) { - if (application.getEndTime() == null || endTime != application.getEndTime().getTime()) { - application.setEndTime(new Date(endTime)); - } + if (endTime != -1 + && (application.getEndTime() == null || endTime != application.getEndTime().getTime())) { + application.setEndTime(new Date(endTime)); } application.setJobId(jobOverview.getId()); @@ -676,19 +675,18 @@ private YarnAppInfo httpYarnAppInfo(Application application) throws Exception { private Overview httpOverview(Application application) throws IOException { String appId = application.getAppId(); - if (appId != null) { - if (FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode() - || FlinkExecutionMode.YARN_PER_JOB == application.getFlinkExecutionMode()) { - String reqURL; - if (StringUtils.isBlank(application.getJobManagerUrl())) { - String format = "proxy/%s/overview"; - reqURL = String.format(format, appId); - } else { - String format = "%s/overview"; - reqURL = String.format(format, application.getJobManagerUrl()); - } - return yarnRestRequest(reqURL, Overview.class); + if (appId != null + && (FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode() + || FlinkExecutionMode.YARN_PER_JOB == application.getFlinkExecutionMode())) { + String reqURL; + if (StringUtils.isBlank(application.getJobManagerUrl())) { + String format = "proxy/%s/overview"; + reqURL = String.format(format, appId); + } else { + String format = "%s/overview"; + reqURL = String.format(format, application.getJobManagerUrl()); } + return yarnRestRequest(reqURL, Overview.class); } return null; }