Skip to content

Commit

Permalink
[Feature] Redesigned Spark application entity (#3952)
Browse files Browse the repository at this point in the history
* feat: add spark job state tracking

* feat: adjust base on updated code and add resource monitoring

* fix: fix e2e build failure

* feature: support spark parameters configuring

* feature: change cancel operation to stop operation

* fix: modify comment

* fix: alignment line.

* improve: remove flink related feature in SparkApplicationInfoService

* feature: reconstruct SparkApplication and merge dev

* feature: merge dev

* feature: add new tracking metrics

* feature: add args and queue configuring

* feature: add spark sql validation

* feature: add app db schema and fix some problems

* improve: add comments

* fix: fix status fetching

* feat: persist schedule configuration

* improve: modify comments

* improve: extract some variables as constants

* fix: adjust code style

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
lenoxzhao and wolfboys authored Aug 15, 2024
1 parent 9aff8c8 commit 2303f06
Show file tree
Hide file tree
Showing 52 changed files with 3,382 additions and 1,124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** The flink deployment mode enum. */
/** The spark deployment mode enum. */
public enum SparkDevelopmentMode {

/** Unknown type replace null */
Expand All @@ -29,8 +29,11 @@ public enum SparkDevelopmentMode {
/** custom code */
CUSTOM_CODE("Custom Code", 1),

/** spark SQL */
SPARK_SQL("Spark SQL", 2);
/** Spark SQL */
SPARK_SQL("Spark SQL", 2),

/** Py spark Mode */
PYSPARK("Python Spark", 3);

private final String name;

Expand All @@ -44,17 +47,22 @@ public enum SparkDevelopmentMode {
/**
* Try to resolve the mode value into {@link SparkDevelopmentMode}.
*
* @param value The mode value of potential flink deployment mode.
* @return The parsed flink deployment mode.
* @param value The mode value of potential spark deployment mode.
* @return The parsed spark deployment mode.
*/
@Nonnull
public static SparkDevelopmentMode valueOf(@Nullable Integer value) {
for (SparkDevelopmentMode flinkDevelopmentMode : values()) {
if (flinkDevelopmentMode.mode.equals(value)) {
return flinkDevelopmentMode;
for (SparkDevelopmentMode sparkDevelopmentMode : values()) {
if (sparkDevelopmentMode.mode.equals(value)) {
return sparkDevelopmentMode;
}
}
return SparkDevelopmentMode.UNKNOWN;
}

/** Get the mode value of the current {@link SparkDevelopmentMode} enum. */
@Nonnull
public Integer getMode() {
return mode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.enums;

import javax.annotation.Nullable;

/** Spark SQL validation failed type enum. */
public enum SparkSqlValidationFailedType {

/** Basic test failed (such as null, etc.) */
VERIFY_FAILED(1),

/** syntax error */
SYNTAX_ERROR(2),

/** unsupported dialect */
UNSUPPORTED_DIALECT(3),

/** unsupported sql command */
UNSUPPORTED_SQL(4),

/** Not at the end of ";" */
ENDS_WITH(5),

/** Class exception */
CLASS_ERROR(6);

private final int failedType;

SparkSqlValidationFailedType(int failedType) {
this.failedType = failedType;
}

/**
* Try to resolve the given spark SQL validation failed type value into a known {@link
* SparkSqlValidationFailedType} enum.
*/
@Nullable
public static SparkSqlValidationFailedType of(Integer value) {
for (SparkSqlValidationFailedType type : values()) {
if (type.failedType == value) {
return type;
}
}
return null;
}

public int getFailedType() {
return failedType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,33 @@ object ConfigKeys {

val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"

val KEY_SPARK_DRIVER_CORES = "spark.driver.cores"

val KEY_SPARK_DRIVER_MEMORY = "spark.driver.memory"

val KEY_SPARK_EXECUTOR_INSTANCES = "spark.executor.instances"

val KEY_SPARK_EXECUTOR_CORES = "spark.executor.cores"

val KEY_SPARK_EXECUTOR_MEMORY = "spark.executor.memory"

val KEY_SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"

val KEY_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"

val KEY_SPARK_YARN_QUEUE = "spark.yarn.queue"

val KEY_SPARK_YARN_QUEUE_NAME = "yarnQueueName"

val KEY_SPARK_YARN_QUEUE_LABEL = "yarnQueueLabel"

val KEY_SPARK_YARN_AM_NODE_LABEL = "spark.yarn.am.nodeLabelExpression"

val KEY_SPARK_YARN_EXECUTOR_NODE_LABEL = "spark.yarn.executor.nodeLabelExpression"

def KEY_SPARK_SQL(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}sql"

/** about config flink */
def KEY_APP_CONF(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}conf"
Expand All @@ -87,7 +114,7 @@ object ConfigKeys {

val KEY_FLINK_TABLE_PREFIX = "flink.table."

val KEY_SPARK_PROPERTY_PREFIX = "spark.property."
val KEY_SPARK_PROPERTY_PREFIX = "spark."

val KEY_APP_PREFIX = "app."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ object PropertiesUtils extends Logger {

private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")

private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN = Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")

// scalastyle:off
private[this] lazy val SPARK_ARGUMENT_REGEXP = "\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
// scalastyle:on

private[this] lazy val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"

private[this] lazy val MULTI_PROPERTY_PATTERN = Pattern.compile(MULTI_PROPERTY_REGEXP)
Expand Down Expand Up @@ -380,4 +386,48 @@ object PropertiesUtils extends Logger {
new JavaHashMap[String, JavaMap[String, String]](map)
}

/** extract spark configuration from sparkApplication.appProperties */
@Nonnull def extractSparkPropertiesAsJava(properties: String): JavaMap[String, String] =
new JavaHashMap[String, String](extractSparkProperties(properties))

@Nonnull def extractSparkProperties(properties: String): Map[String, String] = {
if (StringUtils.isEmpty(properties)) Map.empty[String, String]
else {
val map = mutable.Map[String, String]()
properties.split("(\\s)*(--conf|-c)(\\s)+") match {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
if (p.matches) {
map += p.group(1).trim -> p.group(2).trim
}
}
})
case _ =>
}
map.toMap
}
}

/** extract spark configuration from sparkApplication.appArgs */
@Nonnull def extractSparkArgumentsAsJava(arguments: String): JavaList[String] =
new JavaArrayList[String](extractSparkArguments(arguments))

@Nonnull def extractSparkArguments(arguments: String): List[String] = {
if (StringUtils.isEmpty(arguments)) List.empty[String]
else {
val list = List[String]()
arguments.split(SPARK_ARGUMENT_REGEXP) match {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
list :+ x
}
})
case _ =>
}
list
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -567,65 +567,56 @@ drop table if exists `t_spark_app`;
create table `t_spark_app` (
`id` bigint not null auto_increment,
`team_id` bigint not null,
`job_type` tinyint default null,
`execution_mode` tinyint default null,
`job_type` tinyint default null comment '(1)custom code(2)spark SQL',
`app_type` tinyint default null comment '(1)Apache Spark(2)StreamPark Spark',
`version_id` bigint default null comment 'spark version',
`app_name` varchar(255) collate utf8mb4_general_ci default null comment 'spark.app.name',
`execution_mode` tinyint default null comment 'spark.submit.deployMode(1)cluster(2)client',
`resource_from` tinyint default null,
`project_id` bigint default null,
`job_name` varchar(255) collate utf8mb4_general_ci default null,
`module` varchar(255) collate utf8mb4_general_ci default null,
`main_class` varchar(255) collate utf8mb4_general_ci default null comment 'The entry point for your application (e.g. org.apache.spark.examples.SparkPi)',
`jar` varchar(255) collate utf8mb4_general_ci default null,
`jar_check_sum` bigint default null,
`main_class` varchar(255) collate utf8mb4_general_ci default null,
`args` text collate utf8mb4_general_ci,
`options` text collate utf8mb4_general_ci,
`hot_params` text collate utf8mb4_general_ci,
`user_id` bigint default null,
`app_id` varchar(64) collate utf8mb4_general_ci default null,
`app_type` tinyint default null,
`duration` bigint default null,
`job_id` varchar(64) collate utf8mb4_general_ci default null,
`job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
`version_id` bigint default null,
`cluster_id` varchar(45) collate utf8mb4_general_ci default null,
`k8s_name` varchar(63) collate utf8mb4_general_ci default null,
`k8s_namespace` varchar(63) collate utf8mb4_general_ci default null,
`spark_image` varchar(128) collate utf8mb4_general_ci default null,
`state` int default null,
`app_properties` text collate utf8mb4_general_ci comment 'Arbitrary Spark configuration property in key=value format (e.g. spark.driver.cores=1)',
`app_args` text collate utf8mb4_general_ci comment 'Arguments passed to the main method of your main class',
`app_id` varchar(64) collate utf8mb4_general_ci default null comment '(1)application_id on yarn(2)driver_pod_name on k8s',
`yarn_queue` varchar(128) collate utf8mb4_general_ci default null,
`k8s_master_url` varchar(128) collate utf8mb4_general_ci default null,
`k8s_container_image` varchar(128) collate utf8mb4_general_ci default null,
`k8s_image_pull_policy` tinyint default 1,
`k8s_service_account` varchar(64) collate utf8mb4_general_ci default null,
`k8s_namespace` varchar(64) collate utf8mb4_general_ci default null,
`hadoop_user` varchar(64) collate utf8mb4_general_ci default null,
`restart_size` int default null,
`restart_count` int default null,
`cp_threshold` int default null,
`cp_max_failure_interval` int default null,
`cp_failure_rate_interval` int default null,
`cp_failure_action` tinyint default null,
`dynamic_properties` text collate utf8mb4_general_ci,
`description` varchar(255) collate utf8mb4_general_ci default null,
`resolve_order` tinyint default null,
`k8s_rest_exposed_type` tinyint default null,
`jm_memory` int default null,
`tm_memory` int default null,
`total_task` int default null,
`total_tm` int default null,
`total_slot` int default null,
`available_slot` int default null,
`state` int default null,
`options` text collate utf8mb4_general_ci,
`option_state` tinyint default null,
`tracking` tinyint default null,
`create_time` datetime default null comment 'create time',
`modify_time` datetime default null comment 'modify time',
`option_time` datetime default null,
`user_id` bigint default null,
`description` varchar(255) collate utf8mb4_general_ci default null,
`tracking` tinyint default null,
`release` tinyint default 1,
`build` tinyint default 1,
`alert_id` bigint default null,
`create_time` datetime default null,
`modify_time` datetime default null,
`start_time` datetime default null,
`end_time` datetime default null,
`alert_id` bigint default null,
`k8s_pod_template` text collate utf8mb4_general_ci,
`k8s_jm_pod_template` text collate utf8mb4_general_ci,
`k8s_tm_pod_template` text collate utf8mb4_general_ci,
`k8s_hadoop_integration` tinyint default 0,
`spark_cluster_id` bigint default null,
`ingress_template` text collate utf8mb4_general_ci,
`default_mode_ingress` text collate utf8mb4_general_ci,
`tags` varchar(500) default null,
`hadoop_user` varchar(64) collate utf8mb4_general_ci default null,
`duration` bigint default null,
`tags` varchar(500) collate utf8mb4_general_ci default null,
`driver_cores` varchar(64) collate utf8mb4_general_ci default null,
`driver_memory` varchar(64) collate utf8mb4_general_ci default null,
`executor_cores` varchar(64) collate utf8mb4_general_ci default null,
`executor_memory` varchar(64) collate utf8mb4_general_ci default null,
`executor_max_nums` varchar(64) collate utf8mb4_general_ci default null,
`num_tasks` bigint default null,
`num_completed_tasks` bigint default null,
`num_stages` bigint default null,
`num_completed_stages` bigint default null,
`used_memory` bigint default null,
`used_v_cores` bigint default null,
primary key (`id`) using btree,
key `inx_job_type` (`job_type`) using btree,
key `inx_track` (`tracking`) using btree,
Expand All @@ -651,4 +642,69 @@ create table `t_spark_log` (
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


-- ----------------------------
-- table structure for t_spark_effective
-- ----------------------------
drop table if exists `t_spark_effective`;
create table `t_spark_effective` (
`id` bigint not null auto_increment,
`app_id` bigint not null,
`target_type` tinyint not null comment '1) config 2) spark sql',
`target_id` bigint not null comment 'configid or sqlid',
`create_time` datetime default null comment 'create time',
primary key (`id`) using btree,
unique key `un_effective_inx` (`app_id`,`target_type`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


-- ----------------------------
-- table structure for t_spark_config
-- ----------------------------
drop table if exists `t_spark_config`;
create table `t_spark_config` (
`id` bigint not null auto_increment,
`app_id` bigint not null,
`format` tinyint not null default 0,
`version` int not null,
`latest` tinyint not null default 0,
`content` text collate utf8mb4_general_ci not null,
`create_time` datetime default null comment 'create time',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


-- ----------------------------
-- Table structure for t_spark_sql
-- ----------------------------
drop table if exists `t_spark_sql`;
create table `t_spark_sql` (
`id` bigint not null auto_increment,
`app_id` bigint default null,
`sql` text collate utf8mb4_general_ci,
`team_resource` varchar(64) collate utf8mb4_general_ci,
`dependency` text collate utf8mb4_general_ci,
`version` int default null,
`candidate` tinyint not null default 1,
`create_time` datetime default null comment 'create time',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


-- ----------------------------
-- table structure for t_spark_app_backup
-- ----------------------------
drop table if exists `t_spark_app_backup`;
create table `t_spark_app_backup` (
`id` bigint not null auto_increment,
`app_id` bigint default null,
`sql_id` bigint default null,
`config_id` bigint default null,
`version` int default null,
`path` varchar(128) collate utf8mb4_general_ci default null,
`description` varchar(255) collate utf8mb4_general_ci default null,
`create_time` datetime default null comment 'create time',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


set foreign_key_checks = 1;
Loading

0 comments on commit 2303f06

Please sign in to comment.