Skip to content

Commit

Permalink
[Feature] Support spark job status tracking (#3843)
Browse files Browse the repository at this point in the history
* feat: add spark job state tracking

* feat: adjust base on updated code and add resource monitoring

* fix: fix e2e build failure

* feature: support spark parameters configuring

* feature: change cancel operation to stop operation

* fix: modify comment

* improve: remove flink related feature in SparkApplicationInfoService

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
lenoxzhao and wolfboys authored Jul 19, 2024
1 parent 942116b commit af65569
Show file tree
Hide file tree
Showing 29 changed files with 1,059 additions and 490 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -632,4 +632,23 @@ create table `t_spark_app` (
index `inx_team` (`team_id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


-- ----------------------------
-- table structure for t_spark_log
-- ----------------------------
drop table if exists `t_spark_log`;
create table `t_spark_log` (
`id` bigint not null auto_increment,
`app_id` bigint default null,
`spark_app_id` varchar(64) collate utf8mb4_general_ci default null,
`track_url` varchar(255) collate utf8mb4_general_ci default null,
`success` tinyint default null,
`exception` text collate utf8mb4_general_ci,
`option_time` datetime default null,
`option_name` tinyint default null,
`user_id` bigint default null,
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


set foreign_key_checks = 1;
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ public Executor restAPIWatchingExecutor() {
ThreadUtils.threadFactory("flink-restapi-watching-executor-"));
}

/**
* Create a ThreadPoolTaskExecutor for SparkAppHttpWatcher.
*
* @return Executor
*/
@Bean("sparkRestAPIWatchingExecutor")
public Executor sparkRestAPIWatchingExecutor() {
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("spark-cluster-watching-executor-"));
}

/**
* Create a ThreadPoolTaskExecutor for FlinkClusterWatcher.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.SparkAppStateEnum;

import lombok.Data;

Expand Down Expand Up @@ -140,6 +143,25 @@ public static AlertTemplate of(AlertProbeMsg alertProbeMsg) {
.build();
}

public static AlertTemplate of(SparkApplication application, SparkAppStateEnum appState) {
return new AlertTemplateBuilder()
.setDuration(application.getStartTime(), application.getEndTime())
.setJobName(application.getJobName())
.setLink(application.getSparkExecutionMode(), application.getJobId())
.setStartTime(application.getStartTime())
.setEndTime(application.getEndTime())
.setRestart(application.isNeedRestartOnFailed(), application.getRestartCount())
.setRestartIndex(application.getRestartCount())
.setTotalRestart(application.getRestartSize())
.setType(1)
.setTitle(
String.format(
"%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(), appState.name()))
.setSubject(
String.format("%s %s %s", ALERT_SUBJECT_PREFIX, application.getJobName(), appState))
.setStatus(appState.name())
.build();
}
private static class AlertTemplateBuilder {

private final AlertTemplate alertTemplate = new AlertTemplate();
Expand Down Expand Up @@ -218,6 +240,17 @@ public AlertTemplateBuilder setLink(FlinkExecutionMode mode, String appId) {
return this;
}

public AlertTemplateBuilder setLink(SparkExecutionMode mode, String appId) {
if (SparkExecutionMode.isYarnMode(mode)) {
String format = "%s/proxy/%s/";
String url = String.format(format, YarnUtils.getRMWebAppURL(false), appId);
alertTemplate.setLink(url);
} else {
alertTemplate.setLink(null);
}
return this;
}

public AlertTemplateBuilder setCpFailureRateInterval(String cpFailureRateInterval) {
alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.AppUpdated;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class SparkApplicationController {
private ApplicationBackUpService backUpService;

@Autowired
private ApplicationLogService applicationLogService;
private SparkApplicationLogService applicationLogService;

@Autowired
private ResourceService resourceService;
Expand Down Expand Up @@ -153,8 +153,8 @@ public RestResponse start(SparkApplication app) {

@PostMapping(value = "cancel")
@RequiresPermissions("app:cancel")
public RestResponse cancel(SparkApplication app) throws Exception {
applicationActionService.cancel(app);
public RestResponse stop(SparkApplication app) throws Exception {
applicationActionService.stop(app);
return RestResponse.success();
}

Expand Down Expand Up @@ -209,8 +209,8 @@ public RestResponse backups(ApplicationBackUp backUp, RestRequest request) {
}

@PostMapping("optionlog")
public RestResponse optionlog(ApplicationLog applicationLog, RestRequest request) {
IPage<ApplicationLog> applicationList = applicationLogService.getPage(applicationLog, request);
public RestResponse optionlog(SparkApplicationLog applicationLog, RestRequest request) {
IPage<SparkApplicationLog> applicationList = applicationLogService.getPage(applicationLog, request);
return RestResponse.success(applicationList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.ResourceFromEnum;
import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
Expand Down Expand Up @@ -285,19 +286,16 @@ public void setYarnQueueByHotParams() {
}

/**
* Determine if a FlinkAppState requires tracking.
* Determine if a SparkAppState requires tracking.
*
* @return 1: need to be tracked | 0: no need to be tracked.
*/
public Boolean shouldTracking() {
switch (getStateEnum()) {
case ADDED:
case CREATED:
case FINISHED:
case FAILED:
case CANCELED:
case TERMINATED:
case POS_TERMINATED:
case KILLED:
return false;
default:
return true;
Expand All @@ -312,15 +310,11 @@ public Boolean shouldTracking() {
public boolean isCanBeStart() {
switch (getStateEnum()) {
case ADDED:
case CREATED:
case FAILED:
case CANCELED:
case FINISHED:
case LOST:
case TERMINATED:
case SUCCEEDED:
case KILLED:
case POS_TERMINATED:
return true;
default:
return false;
Expand All @@ -338,8 +332,8 @@ public FlinkDevelopmentMode getDevelopmentMode() {
}

@JsonIgnore
public FlinkAppStateEnum getStateEnum() {
return FlinkAppStateEnum.of(state);
public SparkAppStateEnum getStateEnum() {
return SparkAppStateEnum.of(state);
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;

@Data
@TableName("t_spark_log")
@Slf4j
public class SparkApplicationLog {

@TableId(type = IdType.AUTO)
private Long id;
/** appId */
private Long appId;
/** applicationId */
private String sparkAppId;
/** tracking url of current spark application */
private String trackUrl;
/** start status */
private Boolean success;
/** option name */
private Integer optionName;
/** option time */
private Date optionTime;
/** exception at the start */
private String exception;
/** The user who operates the application */
private Long userId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,75 +26,47 @@ public enum SparkAppStateEnum {
/** Added new job to database. */
ADDED(0),

/**
* The job has been received by the Dispatcher, and is waiting for the job manager to be created.
*/
INITIALIZING(1),
/** (From Yarn)Application which was just created. */
NEW(1),

/** Job is newly created, no task has started to run. */
CREATED(2),
/** (From Yarn)Application which is being saved. */
NEW_SAVING(2),

/** Application which is currently running. */
STARTING(3),

/** Application which is currently running. */
RESTARTING(4),
/** (From Yarn)Application which has been submitted. */
SUBMITTED(4),

/** Some tasks are scheduled or running, some may be pending, some may be finished. */
RUNNING(5),
/** (From Yarn)Application has been accepted by the scheduler. */
ACCEPTED(5),

/** The job has failed and is currently waiting for the cleanup to complete. */
FAILING(6),

/** The job has failed with a non-recoverable task failure. */
FAILED(7),

/** Job is being cancelled. */
CANCELLING(8),

/** Job has been cancelled. */
CANCELED(9),
RUNNING(6),

/** All the job's tasks have successfully finished. */
FINISHED(10),
/** (From Yarn)Application which finished successfully. */
FINISHED(7),

/**
* The job has been suspended which means that it has been stopped but not been removed from a
* potential HA job store.
*/
SUSPENDED(11),

/** The job is currently reconciling and waits for task execution report to recover state. */
RECONCILING(12),
/** (From Yarn)Application which failed. */
FAILED(8),

/** Loss of mapping. */
LOST(13),
LOST(9),

/** Mapping. */
MAPPING(14),
MAPPING(10),

/** Other statuses. */
OTHER(15),
OTHER(11),

/** Has rollback. */
REVOKED(16),

/**
* Lost track of Spark job temporarily. A complete loss of Spark job tracking translates into LOST
* state.
*/
@Deprecated
SILENT(17),

/** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED. */
TERMINATED(18),
REVOKED(12),

/** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED. */
@Deprecated
POS_TERMINATED(19),
/** Spark job has being cancelling(killing) by streampark */
STOPPING(13),

/** Job SUCCEEDED on yarn. */
SUCCEEDED(20),
SUCCEEDED(14),

/** Has killed in Yarn. */
KILLED(-9);
Expand Down Expand Up @@ -125,13 +97,11 @@ public static SparkAppStateEnum of(String name) {

public static boolean isEndState(Integer appState) {
SparkAppStateEnum sparkAppStateEnum = SparkAppStateEnum.of(appState);
return SparkAppStateEnum.CANCELED == sparkAppStateEnum
|| SparkAppStateEnum.FAILED == sparkAppStateEnum
return SparkAppStateEnum.FAILED == sparkAppStateEnum
|| SparkAppStateEnum.KILLED == sparkAppStateEnum
|| SparkAppStateEnum.FINISHED == sparkAppStateEnum
|| SparkAppStateEnum.SUCCEEDED == sparkAppStateEnum
|| SparkAppStateEnum.LOST == sparkAppStateEnum
|| SparkAppStateEnum.TERMINATED == sparkAppStateEnum;
|| SparkAppStateEnum.LOST == sparkAppStateEnum;
}

public static boolean isLost(Integer appState) {
Expand Down
Loading

0 comments on commit af65569

Please sign in to comment.