Skip to content

Commit

Permalink
[Improve] flink-app & spark-app id auto-increment improvements (#4098)
Browse files Browse the repository at this point in the history
* [Improve] StopRequest class-name rename to CancelRequest

* [Improve]  spark & flink app-id auto-increment Improvements

* [Improve] t_flink_log table rename to t_app_log

* [Improve] Effective entity rename to FlinkEffective
  • Loading branch information
wolfboys authored Sep 27, 2024
1 parent 27e97a6 commit e378ea2
Show file tree
Hide file tree
Showing 69 changed files with 511 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ case class Workspace(storageType: StorageType) {

lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"

lazy val SPARK_APP_WORKSPACE = s"$WORKSPACE/spark-workspace"

lazy val APP_FLINK = s"$WORKSPACE/flink"

lazy val APP_SPARK = s"$WORKSPACE/spark"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,22 @@ set names utf8mb4;
set foreign_key_checks = 0;

-- ----------------------------
-- table structure for t_flink_app_backup
-- Table structure for t_app
-- ----------------------------
drop table if exists `t_flink_app_backup`;
create table `t_flink_app_backup` (
`id` bigint not null auto_increment,
`app_id` bigint default null,
`sql_id` bigint default null,
`config_id` bigint default null,
`version` int default null,
`path` varchar(128) collate utf8mb4_general_ci default null,
`description` varchar(255) collate utf8mb4_general_ci default null,
`create_time` datetime default null comment 'create time',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;
create table if not exists `t_app` (
`id` bigint not null,
`job_type` tinyint default null,
`create_time` datetime default null comment 'create time',
`modify_time` datetime default null comment 'modify time',
primary key(`id`)
);

-- ----------------------------
-- Table structure for t_flink_app
-- ----------------------------
drop table if exists `t_flink_app`;
create table `t_flink_app` (
`id` bigint not null auto_increment,
`id` bigint not null,
`team_id` bigint not null,
`job_type` tinyint default null,
`deploy_mode` tinyint default null,
Expand Down Expand Up @@ -110,6 +105,23 @@ create table `t_flink_app` (
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


-- ----------------------------
-- table structure for t_flink_app_backup
-- ----------------------------
drop table if exists `t_flink_app_backup`;
create table `t_flink_app_backup` (
`id` bigint not null,
`app_id` bigint default null,
`sql_id` bigint default null,
`config_id` bigint default null,
`version` int default null,
`path` varchar(128) collate utf8mb4_general_ci default null,
`description` varchar(255) collate utf8mb4_general_ci default null,
`create_time` datetime default null comment 'create time',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;


-- ----------------------------
-- table structure for t_flink_config
-- ----------------------------
Expand Down Expand Up @@ -161,14 +173,14 @@ create table `t_flink_env` (


-- ----------------------------
-- table structure for t_flink_log
-- table structure for t_app_log
-- ----------------------------
drop table if exists `t_flink_log`;
create table `t_flink_log` (
drop table if exists `t_app_log`;
create table `t_app_log` (
`id` bigint not null auto_increment,
`app_id` bigint default null,
`yarn_app_id` varchar(64) collate utf8mb4_general_ci default null,
`job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
`cluster_id` varchar(64) collate utf8mb4_general_ci default null,
`tracking_url` varchar(255) collate utf8mb4_general_ci default null,
`success` tinyint default null,
`exception` text collate utf8mb4_general_ci,
`option_time` datetime default null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ drop table if exists "public"."t_app_build_pipe";
drop table if exists "public"."t_flink_app_backup";
drop table if exists "public"."t_alert_config";
drop table if exists "public"."t_access_token";
drop table if exists "public"."t_flink_log";
drop table if exists "public"."t_app_log";
drop table if exists "public"."t_team";
drop table if exists "public"."t_variable";
drop table if exists "public"."t_external_link";
Expand All @@ -66,7 +66,7 @@ drop sequence if exists "public"."streampark_t_distributed_task_id_seq";
drop sequence if exists "public"."streampark_t_flink_app_backup_id_seq";
drop sequence if exists "public"."streampark_t_alert_config_id_seq";
drop sequence if exists "public"."streampark_t_access_token_id_seq";
drop sequence if exists "public"."streampark_t_flink_log_id_seq";
drop sequence if exists "public"."streampark_t_app_log_id_seq";
drop sequence if exists "public"."streampark_t_team_id_seq";
drop sequence if exists "public"."streampark_t_variable_id_seq";
drop sequence if exists "public"."streampark_t_external_link_id_seq";
Expand Down Expand Up @@ -192,7 +192,7 @@ create sequence "public"."streampark_t_flink_app_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;

create table "public"."t_flink_app" (
"id" int8 not null default nextval('streampark_t_flink_app_id_seq'::regclass),
"id" int8 not null,
"team_id" int8,
"job_type" int2,
"deploy_mode" int2,
Expand Down Expand Up @@ -403,24 +403,24 @@ create index "un_env_name" on "public"."t_flink_env" using btree (


-- ----------------------------
-- table structure for t_flink_log
-- table structure for t_app_log
-- ----------------------------
create sequence "public"."streampark_t_flink_log_id_seq"
create sequence "public"."streampark_t_app_log_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;

create table "public"."t_flink_log" (
"id" int8 not null default nextval('streampark_t_flink_log_id_seq'::regclass),
create table "public"."t_app_log" (
"id" int8 not null default nextval('streampark_t_app_log_id_seq'::regclass),
"app_id" int8,
"yarn_app_id" varchar(64) collate "pg_catalog"."default",
"job_manager_url" varchar(255) collate "pg_catalog"."default",
"cluster_id" varchar(64) collate "pg_catalog"."default",
"tracking_url" varchar(255) collate "pg_catalog"."default",
"success" boolean,
"exception" text collate "pg_catalog"."default",
"option_time" timestamp(6),
"option_name" int2,
"user_id" int8
)
;
alter table "public"."t_flink_log" add constraint "t_flink_log_pkey" primary key ("id");
alter table "public"."t_app_log" add constraint "t_app_log_pkey" primary key ("id");


-- ----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,33 @@ use streampark;
set names utf8mb4;
set foreign_key_checks = 0;

alter table t_app_backup rename to t_flink_app_backup;

-- ----------------------------
-- Table structure for t_app
-- ----------------------------
create table if not exists `t_app` (
`id` bigint not null,
`job_type` tinyint default null,
`create_time` datetime default null comment 'create time',
`modify_time` datetime default null comment 'modify time',
primary key(`id`)
);


alter table `t_flink_app`
add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
modify column `id` not null;
add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
-- modify_time change with duration #3188
modify column `modify_time` datetime not null default current_timestamp comment 'modify time';

alter table `t_flink_log`
alter table t_app_backup rename to t_flink_app_backup;

alter table t_flink_log rename to t_app_log;

alter table `t_app_log`
change column `yarn_app_id` `cluster_id` varchar(64) default null,
change column `job_manager_url` `tracking_url` varchar(255) default null,
add column `job_type` tinyint default null,
add column `user_id` bigint default null comment 'operator user id';

alter table `t_flink_project`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkApplicationConfig;
import org.apache.streampark.console.core.service.FlinkApplicationConfigService;
import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.application.AppBuildPipeService;
import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot;
import org.apache.streampark.flink.packer.pipeline.PipelineTypeEnum;

Expand All @@ -42,7 +42,7 @@
@Validated
@RestController
@RequestMapping("flink/pipe")
public class ApplicationBuildPipelineController {
public class FlinkApplicationBuildPipelineController {

@Autowired
private AppBuildPipeService appBuildPipeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.application.ApplicationLogService;
import org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
import org.apache.streampark.console.core.service.application.FlinkApplicationManageService;

Expand All @@ -55,7 +55,7 @@
@Validated
@RestController
@RequestMapping("flink/app")
public class ApplicationController {
public class FlinkApplicationController {

@Autowired
private FlinkApplicationManageService applicationManageService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
@Validated
@RestController
@RequestMapping("flink/history")
public class ApplicationHistoryController {
public class FlinkApplicationHistoryController {

@Autowired
private FlinkApplicationInfoService applicationInfoService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.bean.FlinkCatalogParams;
import org.apache.streampark.console.core.service.CatalogService;
import org.apache.streampark.console.core.service.FlinkCatalogService;
import org.apache.streampark.console.core.util.ServiceHelper;

import org.apache.shiro.authz.annotation.RequiresPermissions;
Expand All @@ -40,10 +40,10 @@
@Validated
@RestController
@RequestMapping("flink/catalog")
public class CatalogController {
public class FlinkCatalogController {

@Autowired
CatalogService catalogService;
FlinkCatalogService catalogService;

@Permission(team = "#catalog.teamId")
@PostMapping("create")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.enums.UserTypeEnum;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ProxyService;
import org.apache.streampark.console.core.service.application.ApplicationLogService;
import org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.system.entity.Member;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
import org.apache.streampark.console.core.service.application.SparkAppBuildPipeService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;

import org.apache.shiro.authz.annotation.RequiresPermissions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationConfig;
import org.apache.streampark.console.core.service.SparkApplicationConfigService;
import org.apache.streampark.console.core.service.application.SparkApplicationConfigService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.UserTypeEnum;
import org.apache.streampark.console.core.service.ProxyService;
import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.system.entity.Member;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.Date;

@Data
@TableName("t_app")
@Slf4j
public class Application implements Serializable {

@TableId(type = IdType.AUTO)
private Long id;
/**
* 1: flink job
* 2: spark job
*/
private Integer jobType;

private Date createTime;

private Date modifyTime;

}
Loading

0 comments on commit e378ea2

Please sign in to comment.