Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Improve streampark-console module base on 3.6 Control/Condition Statements #3328

Merged
merged 3 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,10 @@ public Object intercept(final Invocation invocation) throws Throwable {
RowBounds rowBounds = (RowBounds) args[2];
ResultHandler<?> resultHandler = (ResultHandler<?>) args[3];
Executor executor = (Executor) invocation.getTarget();
CacheKey cacheKey;
BoundSql boundSql;
if (args.length == 4) {
boundSql = ms.getBoundSql(parameter);
cacheKey = executor.createCacheKey(ms, parameter, rowBounds, boundSql);
} else {
cacheKey = (CacheKey) args[4];
boundSql = (BoundSql) args[5];
}
boolean fourLen = args.length == 4;
BoundSql boundSql = fourLen ? ms.getBoundSql(parameter) : (BoundSql) args[5];
CacheKey cacheKey =
fourLen ? executor.createCacheKey(ms, parameter, rowBounds, boundSql) : (CacheKey) args[4];
return executor.query(ms, parameter, rowBounds, resultHandler, cacheKey, boundSql);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ private ObjectUtils() {}

public static boolean trimEquals(Object o1, Object o2) {
boolean equals = Objects.deepEquals(o1, o2);
if (!equals) {
if (o1 instanceof String && o2 instanceof String) {
return o1.toString().trim().equals(o2.toString().trim());
}
if (!equals && o1 instanceof String && o2 instanceof String) {
return o1.toString().trim().equals(o2.toString().trim());
}
return equals;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,8 @@ public RestResponse checkSavepointPath(Application app) throws Exception {
String error = applicationInfoService.checkSavepointPath(app);
if (error == null) {
return RestResponse.success(true);
} else {
return RestResponse.success(false).message(error);
}
return RestResponse.success(false).message(error);
}

@Operation(summary = "Get application on k8s deploy logs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,15 @@ public RestResponse completeHostAlias(String hosts, String podTemplate) {
private Map<String, String> covertHostsParamToMap(String hosts) {
if (StringUtils.isBlank(hosts)) {
return new HashMap<>(0);
} else {
return Arrays.stream(hosts.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.map(e -> e.split(":"))
.filter(
arr ->
arr.length == 2
&& StringUtils.isNotBlank(arr[0])
&& StringUtils.isNotBlank(arr[1]))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}
return Arrays.stream(hosts.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.map(e -> e.split(":"))
.filter(
arr ->
arr.length == 2 && StringUtils.isNotBlank(arr[0]) && StringUtils.isNotBlank(arr[1]))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}

@Operation(summary = "Extract host-alias from pod template")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ public RestResponse verify(String sql, Long versionId, Long teamId) {
.put(END, flinkSqlValidationResult.errorLine() + 1);
}
return response;
} else {
return RestResponse.success(true);
}
return RestResponse.success(true);
}

@Operation(summary = "List the application sql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ public boolean cpFailedTrigger() {
}

public boolean eqFlinkJob(Application other) {
if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
if (this.isFlinkSqlJob()
&& other.isFlinkSqlJob()
&& this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ public static class Bridge {
public static FlinkAppStateEnum fromK8sFlinkJobState(Enumeration.Value flinkJobState) {
if (FlinkJobStateEnum.K8S_INITIALIZING() == flinkJobState) {
return INITIALIZING;
} else {
return of(flinkJobState.toString());
}
return of(flinkJobState.toString());
}

/** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public CheckPointStatusEnum getCheckPointStatus() {
public CheckPointTypeEnum getCheckPointType() {
if ("CHECKPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.CHECKPOINT;
} else if ("SAVEPOINT".equals(this.checkpointType)) {
}
if ("SAVEPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.SAVEPOINT;
}
return CheckPointTypeEnum.SYNC_SAVEPOINT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public AppExistsStateEnum checkExists(Application appParam) {
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsStateEnum.IN_KUBERNETES;
}
Expand All @@ -419,7 +419,7 @@ else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsStateEnum.IN_KUBERNETES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,15 @@ public IPage<Application> page(Application appParam, RestRequest request) {
}
Page<Application> page = new MybatisPager<Application>().getDefaultPage(request);

if (ArrayUtils.isNotEmpty(appParam.getStateArray())) {
if (Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
Integer[] newArray =
ArrayUtils.insert(
appParam.getStateArray().length,
appParam.getStateArray(),
FlinkAppStateEnum.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
if (ArrayUtils.isNotEmpty(appParam.getStateArray())
&& Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
Integer[] newArray =
ArrayUtils.insert(
appParam.getStateArray().length,
appParam.getStateArray(),
FlinkAppStateEnum.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
this.baseMapper.selectPage(page, appParam);
List<Application> records = page.getRecords();
Expand Down Expand Up @@ -492,21 +491,8 @@ public boolean update(Application appParam) {
}

// 2) k8s podTemplate changed.
if (application.getBuild()
&& FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
if (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage())) {
application.setBuild(true);
}
if (application.getBuild() && isK8sPodTemplateChanged(application, appParam)) {
application.setBuild(true);
}

// 3) flink version changed
Expand All @@ -516,13 +502,8 @@ public boolean update(Application appParam) {
}

// 4) yarn application mode change
if (!application.getBuild()) {
if (!application.getExecutionMode().equals(appParam.getExecutionMode())) {
if (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()) {
application.setBuild(true);
}
}
if (!application.getBuild() && isYarnApplicationModeChange(application, appParam)) {
application.setBuild(true);
}

appParam.setJobType(application.getJobType());
Expand Down Expand Up @@ -841,4 +822,25 @@ private boolean isYarnNotDefaultQueue(Application application) {
return FlinkExecutionMode.isYarnPerJobOrAppMode(application.getFlinkExecutionMode())
&& !yarnQueueService.isDefaultQueue(application.getYarnQueue());
}

private boolean isK8sPodTemplateChanged(Application application, Application appParam) {
return FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage()));
}

private boolean isYarnApplicationModeChange(Application application, Application appParam) {
return !application.getExecutionMode().equals(appParam.getExecutionMode())
&& (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,8 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
if (K8sFlinkConfig.isV2Enabled()) {
return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
} else {
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
}
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " + app.getFlinkExecutionMode());
Expand Down Expand Up @@ -602,9 +601,8 @@ public boolean saveEntity(AppBuildPipeline pipe) {
AppBuildPipeline old = getById(pipe.getAppId());
if (old == null) {
return save(pipe);
} else {
return updateById(pipe);
}
return updateById(pipe);
}

private void checkOrElseUploadJar(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ public FlinkEnv getDefault() {
@Override
public FlinkEnv getByIdOrDefault(Long id) {
FlinkEnv flinkEnv = getById(id);
if (flinkEnv == null) {
return getDefault();
}
return flinkEnv;
return flinkEnv == null ? getDefault() : flinkEnv;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ public RestResponse create(Project project) {

if (status) {
return response.message("Add project successfully").data(true);
} else {
return response.message("Add project failed").data(false);
}
return response.message("Add project failed").data(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,17 @@ private Map<String, Object> tryGetRestProps(Application application, FlinkCluste
private String getClusterId(Application application, FlinkCluster cluster) {
if (FlinkExecutionMode.isKubernetesMode(application.getExecutionMode())) {
return application.getClusterId();
} else if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) {
Utils.notNull(
cluster,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
return cluster.getClusterId();
} else {
return application.getAppId();
}
return application.getAppId();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ public int compareTo(WordWithFrequency other) {
// This step is very critical. If the same name is not judged and the count is different,
// then the set collection will default to the same element, and it will be overwritten.
return this.word.compareTo(other.word) * -1;
} else {
return num * -1;
}
return num * -1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,9 @@ private void handleJobOverview(Application application, JobsOverview.Job jobOver
if (application.getStartTime() == null || startTime != application.getStartTime().getTime()) {
application.setStartTime(new Date(startTime));
}
if (endTime != -1) {
if (application.getEndTime() == null || endTime != application.getEndTime().getTime()) {
application.setEndTime(new Date(endTime));
}
if (endTime != -1
&& (application.getEndTime() == null || endTime != application.getEndTime().getTime())) {
application.setEndTime(new Date(endTime));
}

application.setJobId(jobOverview.getId());
Expand Down Expand Up @@ -676,19 +675,18 @@ private YarnAppInfo httpYarnAppInfo(Application application) throws Exception {

private Overview httpOverview(Application application) throws IOException {
String appId = application.getAppId();
if (appId != null) {
if (FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_PER_JOB == application.getFlinkExecutionMode()) {
String reqURL;
if (StringUtils.isBlank(application.getJobManagerUrl())) {
String format = "proxy/%s/overview";
reqURL = String.format(format, appId);
} else {
String format = "%s/overview";
reqURL = String.format(format, application.getJobManagerUrl());
}
return yarnRestRequest(reqURL, Overview.class);
if (appId != null
&& (FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_PER_JOB == application.getFlinkExecutionMode())) {
String reqURL;
if (StringUtils.isBlank(application.getJobManagerUrl())) {
String format = "proxy/%s/overview";
reqURL = String.format(format, appId);
} else {
String format = "%s/overview";
reqURL = String.format(format, application.getJobManagerUrl());
}
return yarnRestRequest(reqURL, Overview.class);
}
return null;
}
Expand Down
Loading