Skip to content

Commit

Permalink
Merge branch 'dev' into quickstart-readme
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys authored Aug 24, 2024
2 parents 07f624f + 1c6d8fd commit c9f232d
Show file tree
Hide file tree
Showing 32 changed files with 3,881 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Data
Expand Down Expand Up @@ -74,8 +75,10 @@ public class SparkEnv implements Serializable {
public void doSetSparkConf() throws ApiDetailException {
try {
File yaml = new File(this.sparkHome.concat("/conf/spark-defaults.conf"));
String sparkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8);
this.sparkConf = DeflaterUtils.zipString(sparkConf);
if (yaml.exists()) {
String sparkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8);
this.sparkConf = DeflaterUtils.zipString(sparkConf);
}
} catch (Exception e) {
throw new ApiDetailException(e);
}
Expand All @@ -84,15 +87,12 @@ public void doSetSparkConf() throws ApiDetailException {
public void doSetVersion() {
this.setVersion(this.getSparkVersion().version());
this.setScalaVersion(this.getSparkVersion().scalaVersion());
if (!streamParkScalaVersion.startsWith(this.getSparkVersion().scalaVersion())) {
throw new UnsupportedOperationException(
String.format(
"The current Scala version of StreamPark is %s, but the scala version of Spark to be added is %s, which does not match, Please check",
streamParkScalaVersion, this.getSparkVersion().scalaVersion()));
}
}

public Map<String, String> convertSparkYamlAsMap() {
if (sparkConf == null) {
return new HashMap<>();
}
String sparkYamlString = DeflaterUtils.unzipString(sparkConf);
return PropertiesUtils.loadFlinkConfYaml(sparkYamlString);
}
Expand All @@ -106,7 +106,9 @@ public SparkVersion getSparkVersion() {
}

public void unzipSparkConf() {
this.sparkConf = DeflaterUtils.unzipString(this.sparkConf);
if (sparkConf != null) {
this.sparkConf = DeflaterUtils.unzipString(this.sparkConf);
}
}

public String getLargeVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ public List<String> listHistoryUploadJars() {
.sorted(Comparator.comparingLong(File::lastModified).reversed())
.map(File::getName)
.filter(fn -> fn.endsWith(Constant.JAR_SUFFIX))
.limit(DEFAULT_HISTORY_RECORD_LIMIT)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ insert into `t_menu` values (110118, 110100, 'app sql delete', null, null, 'sql:
insert into `t_menu` values (110301, 110300, 'cluster add', '/flink/add_cluster', 'flink/cluster/Add', 'cluster:create', '', '0', 0, null, now(), now());
insert into `t_menu` values (110302, 110300, 'cluster edit', '/flink/edit_cluster', 'flink/cluster/Edit', 'cluster:update', '', '0', 0, null, now(), now());

insert into `t_menu` values (120100, 120000, 'spark.application', '/spark/app', 'spark/app/View', null, null, '0', 1, 2, now(), now());
insert into `t_menu` values (120200, 120000, 'spark.sparkHome', '/spark/home', 'spark/home/View', null, null, '0', 1, 3, now(), now());
insert into `t_menu` values (120100, 120000, 'spark.application', '/spark/app', 'spark/app/index', null, null, '0', 1, 2, now(), now());
insert into `t_menu` values (120200, 120000, 'spark.sparkHome', '/spark/home', 'spark/home/index', null, null, '0', 1, 3, now(), now());

insert into `t_menu` values (130100, 130000, 'resource.project', '/resource/project', 'resource/project/View', null, 'github', '0', 1, 2, now(), now());
insert into `t_menu` values (130200, 130000, 'resource.variable', '/resource/variable', 'resource/variable/View', null, null, '0', 1, 3, now(), now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ create table if not exists `t_spark_env` (
`spark_home` varchar(255) not null comment 'spark home path',
`version` varchar(64) not null comment 'spark version',
`scala_version` varchar(64) not null comment 'scala version of spark',
`spark_conf` text not null comment 'spark-conf',
`spark_conf` text default null comment 'spark-conf',
`is_default` tinyint not null default 0 comment 'whether default version or not',
`description` varchar(255) default null comment 'description',
`create_time` datetime not null default current_timestamp comment 'create time',
Expand Down
182 changes: 182 additions & 0 deletions streampark-console/streampark-console-webapp/src/api/spark/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { AppListResponse, SparkApplication, DashboardResponse } from './app.type';
import type { Result } from '/#/axios';
import { ContentTypeEnum } from '/@/enums/httpEnum';
import type { AppExistsStateEnum } from '/@/enums/sparkEnum';
import { defHttp } from '/@/utils/http/axios';

const apiPrefix = `/spark/app`;

/**
* Get application information by id
* @param params get parameters
*/
export function fetchGetSparkApp(data: { id: string }) {
return defHttp.post<SparkApplication>({ url: `${apiPrefix}/get`, data });
}
/**
* create spark application information
* @param params get parameters
*/
export function fetchCreateSparkApp(data: SparkApplication) {
return defHttp.post<boolean>({ url: `${apiPrefix}/create`, data });
}
/**
* copy spark application information
* @param params get parameters
*/
export function fetchCopySparkApp(data: SparkApplication) {
return defHttp.post({ url: `${apiPrefix}/copy`, data });
}
/**
* update spark application information
* @param params get parameters
*/
export function fetchUpdateSparkApp(data: SparkApplication) {
return defHttp.post<boolean>({ url: `${apiPrefix}/update`, data });
}

/**
* Dashboard data
* @returns Promise<DashboardResponse>
*/
export function fetchSparkDashboard() {
return defHttp.post<DashboardResponse>({
url: `${apiPrefix}/dashboard`,
});
}

/**
* Get app list data
*/
export function fetchSparkAppRecord(data: Recordable) {
return defHttp.post<AppListResponse>({ url: `${apiPrefix}/list`, data });
}

/**
* mapping
* @param params {id:string,appId:string,jobId:string}
*/
export function fetchSparkMapping(data: SparkApplication) {
return defHttp.post<boolean>({ url: `${apiPrefix}/mapping`, data });
}

export function fetchSparkAppStart(data: SparkApplication) {
return defHttp.post<Result<boolean>>(
{ url: `${apiPrefix}/start`, data },
{ isTransformResponse: false },
);
}

export function fetchCheckSparkAppStart(data: SparkApplication) {
return defHttp.post<AppExistsStateEnum>({ url: `${apiPrefix}/check/start`, data });
}

/**
* Cancel
* @param {CancelParam} data
*/
export function fetchSparkAppCancel(data: SparkApplication) {
return defHttp.post({ url: `${apiPrefix}/cancel`, data });
}

/**
* clean
* @param {CancelParam} data
*/
export function fetchSparkAppClean(data: SparkApplication) {
return defHttp.post({ url: `${apiPrefix}/clean`, data });
}
/**
* forcedStop
*/
export function fetchSparkAppForcedStop(data: SparkApplication) {
return defHttp.post({ url: `${apiPrefix}/forcedStop`, data });
}

/**
* get yarn address
*/
export function fetchSparkYarn() {
return defHttp.post<string>({ url: `${apiPrefix}/yarn` });
}

/**
* check spark name
*/
export function fetchCheckSparkName(data: { id?: string; jobName: string }) {
return defHttp.post<AppExistsStateEnum>({ url: `${apiPrefix}/check/name`, data });
}

/**
* read configuration file
*/
export function fetchSparkAppConf(params?: { config: any }) {
return defHttp.post<string>({
url: `${apiPrefix}/read_conf`,
params,
});
}

/**
* main
*/
export function fetchSparkMain(data: SparkApplication) {
return defHttp.post<string>({
url: `${apiPrefix}/main`,
data,
});
}

export function fetchSparkBackUps(data: SparkApplication) {
return defHttp.post({ url: `${apiPrefix}/backups`, data });
}

export function fetchSparkOptionLog(data: SparkApplication) {
return defHttp.post({ url: `${apiPrefix}/opt_log`, data });
}

export function fetchSparkDeleteOptLog(id: string) {
return defHttp.post({ url: `${apiPrefix}/delete/opt_log`, data: { id } });
}

/**
* remove the app
*/
export function fetchSparkAppRemove(id: string) {
return defHttp.post({ url: `${apiPrefix}/delete`, data: { id } });
}

export function fetchSparkRemoveBackup(id: string) {
return defHttp.post({ url: `${apiPrefix}/delete/bak`, data: { id } });
}

/**
* upload
* @param params
*/
export function fetchSparkUpload(params: any) {
return defHttp.post<string>({
url: `${apiPrefix}/upload`,
params,
headers: {
'Content-Type': ContentTypeEnum.FORM_DATA,
},
timeout: 1000 * 60 * 10, // Uploading files timed out for 10 minutes
});
}
Loading

0 comments on commit c9f232d

Please sign in to comment.