Skip to content

Commit

Permalink
support spark task (#4102)
Browse files Browse the repository at this point in the history
  • Loading branch information
HxpSerein authored Sep 28, 2024
1 parent 37b3b26 commit 975ad4f
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import lombok.Data;

import java.io.Serializable;

@Data
public class SparkTaskItem implements Serializable {

/** appId */
private Long appId;

private Boolean autoStart;

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.Dependency;
Expand Down Expand Up @@ -53,7 +54,6 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -64,7 +64,7 @@
@Data
@TableName("t_flink_app")
@Slf4j
public class FlinkApplication implements Serializable {
public class FlinkApplication extends BaseEntity {

@TableId(type = IdType.INPUT)
private Long id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ public enum DistributedTaskEnum {
/**
* Forces the given application to stop.
*/
ABORT(4);
ABORT(4),

/**
* Stop the given application.
*/
STOP(5),

/**
* Forces the given application to stop.
*/
FORCED_STOP(6);

private final int value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.streampark.console.core.service;

import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.core.entity.DistributedTask;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.enums.DistributedTaskEnum;

import com.baomidou.mybatisplus.extension.service.IService;

import java.util.List;
import java.util.Set;

/**
Expand All @@ -40,16 +39,9 @@ public interface DistributedTaskService extends IService<DistributedTask> {

/**
* This interface is responsible for polling the database to retrieve task records and execute the corresponding operations.
* @param DistributedTask DistributedTask
* @param distributedTask distributedTask
*/
void executeDistributedTask(DistributedTask DistributedTask) throws Exception;

/**
* Through this interface, the watcher obtains the list of tasks that need to be monitored.
* @param applications List<Application>
* @return List<Application> List of tasks that need to be monitored
*/
List<FlinkApplication> getMonitoredTaskList(List<FlinkApplication> applications);
void executeDistributedTask(DistributedTask distributedTask) throws Exception;

/**
* This interface handles task redistribution when server nodes are added.
Expand All @@ -74,9 +66,9 @@ public interface DistributedTaskService extends IService<DistributedTask> {
/**
* Save Distributed Task.
*
* @param appParam Application
* @param appParam It may be one of the following values: FlinkApplication, SparkApplication
* @param autoStart boolean
* @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT
*/
public void saveDistributedTask(FlinkApplication appParam, boolean autoStart, DistributedTaskEnum action);
public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.entity.SparkSql;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import org.apache.streampark.console.core.enums.SparkOperationEnum;
import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.SparkSqlService;
Expand Down Expand Up @@ -129,13 +131,21 @@ public class SparkApplicationActionServiceImpl
@Autowired
private ResourceService resourceService;

@Autowired
private DistributedTaskService distributedTaskService;

private final Map<Long, CompletableFuture<SubmitResponse>> startJobFutureMap = new ConcurrentHashMap<>();

private final Map<Long, CompletableFuture<CancelResponse>> cancelJobFutureMap = new ConcurrentHashMap<>();

@Override
public void revoke(Long appId) throws ApplicationException {
SparkApplication application = getById(appId);
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appId)) {
distributedTaskService.saveDistributedTask(application, false, DistributedTaskEnum.REVOKE);
return;
}
ApiAlertException.throwIfNull(
application, String.format("The application id=%s not found, revoke failed.", appId));

Expand All @@ -161,15 +171,25 @@ public void revoke(Long appId) throws ApplicationException {

@Override
public void restart(SparkApplication appParam) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.RESTART);
return;
}
this.stop(appParam);
this.start(appParam, false);
}

@Override
public void forcedStop(Long id) {
SparkApplication application = this.baseMapper.selectApp(id);
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(id)) {
distributedTaskService.saveDistributedTask(application, false, DistributedTaskEnum.FORCED_STOP);
return;
}
CompletableFuture<SubmitResponse> startFuture = startJobFutureMap.remove(id);
CompletableFuture<CancelResponse> stopFuture = cancelJobFutureMap.remove(id);
SparkApplication application = this.baseMapper.selectApp(id);
if (startFuture != null) {
startFuture.cancel(true);
}
Expand All @@ -183,6 +203,11 @@ public void forcedStop(Long id) {

@Override
public void stop(SparkApplication appParam) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.STOP);
return;
}
SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STOPPING);
SparkApplication application = getById(appParam.getId());
application.setState(SparkAppStateEnum.STOPPING.getValue());
Expand Down Expand Up @@ -245,6 +270,11 @@ public void stop(SparkApplication appParam) throws Exception {

@Override
public void start(SparkApplication appParam, boolean auto) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.START);
return;
}
// 1) check application
final SparkApplication application = getById(appParam.getId());
AssertUtils.notNull(application);
Expand Down
Loading

0 comments on commit 975ad4f

Please sign in to comment.