Skip to content

Commit

Permalink
[Improve] spark-app proxy support.
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Sep 29, 2024
1 parent 185859e commit e523013
Show file tree
Hide file tree
Showing 30 changed files with 126 additions and 411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.enums.EngineTypeEnum;
import org.apache.streampark.console.core.enums.UserTypeEnum;
import org.apache.streampark.console.core.service.ProxyService;
import org.apache.streampark.console.core.service.application.ApplicationLogService;
import org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.MemberService;
import org.apache.streampark.console.system.service.UserService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand All @@ -53,16 +55,16 @@ public class ProxyController {
private ProxyService proxyService;

@Autowired
private FlinkApplicationManageService applicationManageService;
private FlinkApplicationManageService flinkApplicationManageService;

@Autowired
private ApplicationLogService logService;
private SparkApplicationManageService sparkApplicationManageService;

@Autowired
private MemberService memberService;
private ApplicationLogService logService;

@Autowired
private UserService userService;
private MemberService memberService;

@GetMapping("{type}/{id}/assets/**")
public ResponseEntity<?> proxyFlinkAssets(HttpServletRequest request, @PathVariable("type") String type,
Expand All @@ -79,15 +81,17 @@ public ResponseEntity<?> proxyFlink(HttpServletRequest request, @PathVariable("t

private ResponseEntity<?> proxy(String type, HttpServletRequest request, Long id) throws Exception {
ApplicationLog log;
FlinkApplication app;

switch (type) {
case "flink":
app = applicationManageService.getApp(id);
checkProxyApp(app);
return proxyService.proxyFlink(request, app);
case "cluster":
return proxyService.proxyCluster(request, id);
FlinkApplication flinkApplication = flinkApplicationManageService.getApp(id);
checkProxyApp(flinkApplication.getTeamId());
return proxyService.proxyFlink(request, flinkApplication);
case "spark":
SparkApplication sparkApplication = sparkApplicationManageService.getApp(id);
checkProxyApp(sparkApplication.getTeamId());
return proxyService.proxySpark(request, sparkApplication);
case "flink_cluster":
return proxyService.proxyFlinkCluster(request, id);
case "history":
log = logService.getById(id);
checkProxyAppLog(log);
Expand All @@ -101,22 +105,26 @@ private ResponseEntity<?> proxy(String type, HttpServletRequest request, Long id
}
}

private void checkProxyApp(FlinkApplication app) {
ApiAlertException.throwIfNull(app, "Invalid operation, application is invalid.");

private void checkProxyApp(Long teamId) {
User user = ServiceHelper.getLoginUser();
ApiAlertException.throwIfNull(user, "Permission denied, please login first.");

if (user.getUserType() != UserTypeEnum.ADMIN) {
Member member = memberService.getByTeamIdUserName(app.getTeamId(), user.getUsername());
Member member = memberService.getByTeamIdUserName(teamId, user.getUsername());
ApiAlertException.throwIfNull(member,
"Permission denied, this job not created by the current user, And the job cannot be found in the current user's team.");
}
}

private void checkProxyAppLog(ApplicationLog log) {
ApiAlertException.throwIfNull(log, "Invalid operation, The application log not found.");
FlinkApplication app = applicationManageService.getById(log.getAppId());
checkProxyApp(app);
if (log.getJobType() == EngineTypeEnum.FLINK.getCode()) {
FlinkApplication app = flinkApplicationManageService.getById(log.getAppId());
checkProxyApp(app.getTeamId());
}
if (log.getJobType() == EngineTypeEnum.SPARK.getCode()) {
SparkApplication app = sparkApplicationManageService.getById(log.getAppId());
checkProxyApp(app.getTeamId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.AppChangeEvent;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.application.ApplicationLogService;
import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;

import org.apache.shiro.authz.annotation.RequiresPermissions;
Expand Down Expand Up @@ -69,7 +69,7 @@ public class SparkApplicationController {
private FlinkApplicationBackUpService backUpService;

@Autowired
private SparkApplicationLogService applicationLogService;
private ApplicationLogService applicationLogService;

@Autowired
private ResourceService resourceService;
Expand Down Expand Up @@ -202,8 +202,8 @@ public RestResponse backups(FlinkApplicationBackUp backUp, RestRequest request)
}

@PostMapping("opt_log")
public RestResponse optionlog(SparkApplicationLog applicationLog, RestRequest request) {
IPage<SparkApplicationLog> applicationList = applicationLogService.getPage(applicationLog, request);
public RestResponse optionlog(ApplicationLog applicationLog, RestRequest request) {
IPage<ApplicationLog> applicationList = applicationLogService.getPage(applicationLog, request);
return RestResponse.success(applicationList);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class SparkApplication extends BaseEntity {
* driver pod name for spark on K8s.(will be supported in the future)
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String appId;
private String clusterId;

private String yarnQueue;

Expand Down Expand Up @@ -509,7 +509,7 @@ public int hashCode() {
public static class SFunc {

public static final SFunction<SparkApplication, Long> ID = SparkApplication::getId;
public static final SFunction<SparkApplication, String> APP_ID = SparkApplication::getAppId;
public static final SFunction<SparkApplication, String> APP_ID = SparkApplication::getClusterId;
public static final SFunction<SparkApplication, Date> START_TIME = SparkApplication::getStartTime;
public static final SFunction<SparkApplication, Date> END_TIME = SparkApplication::getEndTime;
public static final SFunction<SparkApplication, Long> DURATION = SparkApplication::getDuration;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.entity.SparkApplication;

import org.springframework.http.ResponseEntity;

Expand All @@ -29,11 +29,11 @@ public interface ProxyService {

ResponseEntity<?> proxyFlink(HttpServletRequest request, FlinkApplication app) throws Exception;

ResponseEntity<?> proxyYarn(HttpServletRequest request, SparkApplicationLog log) throws Exception;
ResponseEntity<?> proxySpark(HttpServletRequest request, SparkApplication app) throws Exception;

ResponseEntity<?> proxyYarn(HttpServletRequest request, ApplicationLog log) throws Exception;

ResponseEntity<?> proxyHistory(HttpServletRequest request, ApplicationLog log) throws Exception;

ResponseEntity<?> proxyCluster(HttpServletRequest request, Long clusterId) throws Exception;
ResponseEntity<?> proxyFlinkCluster(HttpServletRequest request, Long clusterId) throws Exception;
}
Loading

0 comments on commit e523013

Please sign in to comment.