Skip to content

Commit

Permalink
[Improve] spark-job support proxy (#4101)
Browse files Browse the repository at this point in the history
* [Bug] spark-job update bug fixed.

* [Improve] spark-app run state minor improvement

* [Improve] spark-job detail page style improvements

* [Improve] spark-job i18n improvements

* [Improve] spark-app cancel bug fixed

* [Improve] spark-job build pipeline improvements

* [Improve] code style improvements

* [Improve] spark build-request bug fixed.

* [Improve] spark-app proxy support.

* [Improve] check tracking minor improvements
  • Loading branch information
wolfboys authored Sep 29, 2024
1 parent 975ad4f commit a0541f1
Show file tree
Hide file tree
Showing 68 changed files with 372 additions and 660 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public enum FlinkJobType {
*/
@Nonnull
public static FlinkJobType of(@Nullable Integer value) {
for (FlinkJobType flinkDevelopmentMode : values()) {
if (flinkDevelopmentMode.mode.equals(value)) {
return flinkDevelopmentMode;
for (FlinkJobType flinkJobType : values()) {
if (flinkJobType.mode.equals(value)) {
return flinkJobType;
}
}
return FlinkJobType.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public enum SparkJobType {
*/
@Nonnull
public static SparkJobType valueOf(@Nullable Integer value) {
for (SparkJobType sparkDevelopmentMode : values()) {
if (sparkDevelopmentMode.mode.equals(value)) {
return sparkDevelopmentMode;
for (SparkJobType sparkJobType : values()) {
if (sparkJobType.mode.equals(value)) {
return sparkJobType;
}
}
return SparkJobType.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ public class AppControl {

/** allow to build the application */
private boolean allowBuild;

private boolean allowView;
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@Validated
@RestController
@RequestMapping("flink/pipe")
public class FlinkApplicationBuildPipelineController {
public class FlinkBuildPipelineController {

@Autowired
private AppBuildPipeService appBuildPipeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.enums.EngineTypeEnum;
import org.apache.streampark.console.core.enums.UserTypeEnum;
import org.apache.streampark.console.core.service.ProxyService;
import org.apache.streampark.console.core.service.application.ApplicationLogService;
import org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.MemberService;
import org.apache.streampark.console.system.service.UserService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand All @@ -53,16 +55,16 @@ public class ProxyController {
private ProxyService proxyService;

@Autowired
private FlinkApplicationManageService applicationManageService;
private FlinkApplicationManageService flinkApplicationManageService;

@Autowired
private ApplicationLogService logService;
private SparkApplicationManageService sparkApplicationManageService;

@Autowired
private MemberService memberService;
private ApplicationLogService logService;

@Autowired
private UserService userService;
private MemberService memberService;

@GetMapping("{type}/{id}/assets/**")
public ResponseEntity<?> proxyFlinkAssets(HttpServletRequest request, @PathVariable("type") String type,
Expand All @@ -79,15 +81,17 @@ public ResponseEntity<?> proxyFlink(HttpServletRequest request, @PathVariable("t

private ResponseEntity<?> proxy(String type, HttpServletRequest request, Long id) throws Exception {
ApplicationLog log;
FlinkApplication app;

switch (type) {
case "flink":
app = applicationManageService.getApp(id);
checkProxyApp(app);
return proxyService.proxyFlink(request, app);
case "cluster":
return proxyService.proxyCluster(request, id);
FlinkApplication flinkApplication = flinkApplicationManageService.getApp(id);
checkProxyApp(flinkApplication.getTeamId());
return proxyService.proxyFlink(request, flinkApplication);
case "spark":
SparkApplication sparkApplication = sparkApplicationManageService.getApp(id);
checkProxyApp(sparkApplication.getTeamId());
return proxyService.proxySpark(request, sparkApplication);
case "flink_cluster":
return proxyService.proxyFlinkCluster(request, id);
case "history":
log = logService.getById(id);
checkProxyAppLog(log);
Expand All @@ -101,22 +105,26 @@ private ResponseEntity<?> proxy(String type, HttpServletRequest request, Long id
}
}

private void checkProxyApp(FlinkApplication app) {
ApiAlertException.throwIfNull(app, "Invalid operation, application is invalid.");

private void checkProxyApp(Long teamId) {
User user = ServiceHelper.getLoginUser();
ApiAlertException.throwIfNull(user, "Permission denied, please login first.");

if (user.getUserType() != UserTypeEnum.ADMIN) {
Member member = memberService.getByTeamIdUserName(app.getTeamId(), user.getUsername());
Member member = memberService.getByTeamIdUserName(teamId, user.getUsername());
ApiAlertException.throwIfNull(member,
"Permission denied, this job not created by the current user, And the job cannot be found in the current user's team.");
}
}

private void checkProxyAppLog(ApplicationLog log) {
ApiAlertException.throwIfNull(log, "Invalid operation, The application log not found.");
FlinkApplication app = applicationManageService.getById(log.getAppId());
checkProxyApp(app);
if (log.getJobType() == EngineTypeEnum.FLINK.getCode()) {
FlinkApplication app = flinkApplicationManageService.getById(log.getAppId());
checkProxyApp(app.getTeamId());
}
if (log.getJobType() == EngineTypeEnum.SPARK.getCode()) {
SparkApplication app = sparkApplicationManageService.getById(log.getAppId());
checkProxyApp(app.getTeamId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.AppChangeEvent;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.application.ApplicationLogService;
import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;

import org.apache.shiro.authz.annotation.RequiresPermissions;
Expand Down Expand Up @@ -69,7 +69,7 @@ public class SparkApplicationController {
private FlinkApplicationBackUpService backUpService;

@Autowired
private SparkApplicationLogService applicationLogService;
private ApplicationLogService applicationLogService;

@Autowired
private ResourceService resourceService;
Expand Down Expand Up @@ -152,8 +152,8 @@ public RestResponse start(SparkApplication app) {

@PostMapping("cancel")
@RequiresPermissions("app:cancel")
public RestResponse stop(SparkApplication app) throws Exception {
applicationActionService.stop(app);
public RestResponse cancel(SparkApplication app) throws Exception {
applicationActionService.cancel(app);
return RestResponse.success();
}

Expand Down Expand Up @@ -202,8 +202,8 @@ public RestResponse backups(FlinkApplicationBackUp backUp, RestRequest request)
}

@PostMapping("opt_log")
public RestResponse optionlog(SparkApplicationLog applicationLog, RestRequest request) {
IPage<SparkApplicationLog> applicationList = applicationLogService.getPage(applicationLog, request);
public RestResponse optionlog(ApplicationLog applicationLog, RestRequest request) {
IPage<ApplicationLog> applicationList = applicationLogService.getPage(applicationLog, request);
return RestResponse.success(applicationList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@Validated
@RestController
@RequestMapping("spark/pipe")
public class SparkApplicationBuildPipelineController {
public class SparkBuildPipelineController {

@Autowired
private SparkAppBuildPipeService appBuildPipeService;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ public void setState(Integer state) {
}

public void setYarnQueueByHotParams() {
if (!(FlinkDeployMode.YARN_APPLICATION == this.getFlinkDeployMode()
|| FlinkDeployMode.YARN_PER_JOB == this.getFlinkDeployMode())) {
if (!(FlinkDeployMode.YARN_APPLICATION == this.getDeployModeEnum()
|| FlinkDeployMode.YARN_PER_JOB == this.getDeployModeEnum())) {
return;
}

Expand Down Expand Up @@ -341,7 +341,7 @@ public ReleaseStateEnum getReleaseState() {
}

@JsonIgnore
public FlinkJobType getDevelopmentMode() {
public FlinkJobType getJobTypeEnum() {
return FlinkJobType.of(jobType);
}

Expand All @@ -356,7 +356,7 @@ public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
}

@JsonIgnore
public FlinkDeployMode getFlinkDeployMode() {
public FlinkDeployMode getDeployModeEnum() {
return FlinkDeployMode.of(deployMode);
}

Expand Down Expand Up @@ -400,7 +400,7 @@ public String getRemoteAppHome() {
/** Automatically identify remoteAppHome or localAppHome based on app FlinkDeployMode */
@JsonIgnore
public String getAppHome() {
switch (this.getFlinkDeployMode()) {
switch (this.getDeployModeEnum()) {
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
case YARN_PER_JOB:
Expand All @@ -412,7 +412,7 @@ public String getAppHome() {
return getRemoteAppHome();
default:
throw new UnsupportedOperationException(
"unsupported deployMode ".concat(getFlinkDeployMode().getName()));
"unsupported deployMode ".concat(getDeployModeEnum().getName()));
}
}

Expand Down Expand Up @@ -558,7 +558,7 @@ public void updateHotParams(FlinkApplication appParam) {
if (appParam != this) {
this.hotParams = null;
}
FlinkDeployMode deployModeEnum = appParam.getFlinkDeployMode();
FlinkDeployMode deployModeEnum = appParam.getDeployModeEnum();
Map<String, String> hotParams = new HashMap<>(0);
if (needFillYarnQueueLabel(deployModeEnum)) {
hotParams.putAll(YarnQueueLabelExpression.getQueueLabelMap(appParam.getYarnQueue()));
Expand Down Expand Up @@ -594,7 +594,7 @@ public int hashCode() {
}

public boolean isKubernetesModeJob() {
return FlinkDeployMode.isKubernetesMode(this.getFlinkDeployMode());
return FlinkDeployMode.isKubernetesMode(this.getDeployModeEnum());
}

public static class SFunc {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public FlinkApplicationBackUp(FlinkApplication application) {
}

private void renderPath(FlinkApplication application) {
switch (application.getFlinkDeployMode()) {
switch (application.getDeployModeEnum()) {
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
case YARN_PER_JOB:
Expand All @@ -80,7 +80,7 @@ private void renderPath(FlinkApplication application) {
break;
default:
throw new UnsupportedOperationException(
"unsupported deployMode ".concat(application.getFlinkDeployMode().getName()));
"unsupported deployMode ".concat(application.getDeployModeEnum().getName()));
}
}
}
Loading

0 comments on commit a0541f1

Please sign in to comment.