From af655696f3361946c792a32214908b3061675ba7 Mon Sep 17 00:00:00 2001 From: lenoxzhao <66543456+lenoxzhao@users.noreply.github.com> Date: Fri, 19 Jul 2024 15:43:08 +0800 Subject: [PATCH] [Feature] Support spark job status tracking (#3843) * feat: add spark job state tracking * feat: adjust base on updated code and add resource monitoring * fix: fix e2e build failure * feature: support spark parameters configuring * feature: change cancel operation to stop operation * fix: modify comment * improve: remove flink related feature in SparkApplicationInfoService --------- Co-authored-by: benjobs --- .../assembly/script/schema/mysql-schema.sql | 19 + .../base/config/AsyncExecutorPoolConfig.java | 16 + .../console/core/bean/AlertTemplate.java | 33 ++ .../SparkApplicationController.java | 14 +- .../console/core/entity/SparkApplication.java | 16 +- .../core/entity/SparkApplicationLog.java | 51 +++ .../console/core/enums/SparkAppStateEnum.java | 74 +--- .../core/enums/SparkOperationEnum.java | 39 ++ .../core/enums/SparkOptionStateEnum.java | 46 +++ .../mapper/SparkApplicationLogMapper.java | 25 ++ .../console/core/metrics/spark/Job.java | 67 +++ .../core/metrics/spark/SparkExecutor.java | 32 ++ .../service/SparkApplicationLogService.java | 44 ++ .../SparkApplicationActionService.java | 8 +- .../SparkApplicationInfoService.java | 33 -- .../SparkApplicationActionServiceImpl.java | 246 ++++------- .../impl/SparkApplicationInfoServiceImpl.java | 137 +----- .../impl/SparkAppBuildPipeServiceImpl.java | 20 +- .../impl/SparkApplicationLogServiceImpl.java | 57 +++ .../core/watcher/SparkAppHttpWatcher.java | 389 ++++++++++++++++++ .../streampark/spark/client/SparkClient.scala | 8 +- ...{CancelRequest.scala => StopRequest.scala} | 3 +- ...ancelResponse.scala => StopResponse.scala} | 2 +- .../spark/client/bean/SubmitResponse.scala | 1 + .../client/conf/SparkConfiguration.scala | 27 ++ .../spark/client/proxy/SparkShimsProxy.scala | 10 +- .../spark/client/SparkClientEndpoint.scala | 8 +- .../client/impl/YarnApplicationClient.scala | 102 ++--- .../spark/client/trait/SparkClientTrait.scala | 22 +- 29 files changed, 1059 insertions(+), 490 deletions(-) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java rename streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/{CancelRequest.scala => StopRequest.scala} (96%) rename streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/{CancelResponse.scala => StopResponse.scala} (94%) create mode 100644 streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql index 41495bc71f..69abc8634d 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql @@ -632,4 +632,23 @@ create table `t_spark_app` ( index `inx_team` (`team_id`) using btree ) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; + +-- ---------------------------- +-- table structure for t_spark_log +-- ---------------------------- +drop table if exists `t_spark_log`; +create table `t_spark_log` ( + `id` bigint not null auto_increment, + `app_id` bigint default null, + `spark_app_id` varchar(64) collate utf8mb4_general_ci default null, + `track_url` varchar(255) collate utf8mb4_general_ci default null, + `success` tinyint default null, + `exception` text collate utf8mb4_general_ci, + `option_time` datetime default null, + `option_name` tinyint default null, + `user_id` bigint default null, + primary key (`id`) using btree +) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; + + set foreign_key_checks = 1; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java index 98d9f3f9c7..5d51b3df19 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java @@ -66,6 +66,22 @@ public Executor restAPIWatchingExecutor() { ThreadUtils.threadFactory("flink-restapi-watching-executor-")); } + /** + * Create a ThreadPoolTaskExecutor for SparkAppHttpWatcher. + * + * @return Executor + */ + @Bean("sparkRestAPIWatchingExecutor") + public Executor sparkRestAPIWatchingExecutor() { + return new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors() * 5, + Runtime.getRuntime().availableProcessors() * 10, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024), + ThreadUtils.threadFactory("spark-cluster-watching-executor-")); + } + /** * Create a ThreadPoolTaskExecutor for FlinkClusterWatcher. * diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java index dee7241b3c..ce24a1e9f3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java @@ -19,12 +19,15 @@ import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkExecutionMode; +import org.apache.streampark.common.enums.SparkExecutionMode; import org.apache.streampark.common.util.DateUtils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.FlinkCluster; +import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.enums.CheckPointStatusEnum; import org.apache.streampark.console.core.enums.FlinkAppStateEnum; +import org.apache.streampark.console.core.enums.SparkAppStateEnum; import lombok.Data; @@ -140,6 +143,25 @@ public static AlertTemplate of(AlertProbeMsg alertProbeMsg) { .build(); } + public static AlertTemplate of(SparkApplication application, SparkAppStateEnum appState) { + return new AlertTemplateBuilder() + .setDuration(application.getStartTime(), application.getEndTime()) + .setJobName(application.getJobName()) + .setLink(application.getSparkExecutionMode(), application.getJobId()) + .setStartTime(application.getStartTime()) + .setEndTime(application.getEndTime()) + .setRestart(application.isNeedRestartOnFailed(), application.getRestartCount()) + .setRestartIndex(application.getRestartCount()) + .setTotalRestart(application.getRestartSize()) + .setType(1) + .setTitle( + String.format( + "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(), appState.name())) + .setSubject( + String.format("%s %s %s", ALERT_SUBJECT_PREFIX, application.getJobName(), appState)) + .setStatus(appState.name()) + .build(); + } private static class AlertTemplateBuilder { private final AlertTemplate alertTemplate = new AlertTemplate(); @@ -218,6 +240,17 @@ public AlertTemplateBuilder setLink(FlinkExecutionMode mode, String appId) { return this; } + public AlertTemplateBuilder setLink(SparkExecutionMode mode, String appId) { + if (SparkExecutionMode.isYarnMode(mode)) { + String format = "%s/proxy/%s/"; + String url = String.format(format, YarnUtils.getRMWebAppURL(false), appId); + alertTemplate.setLink(url); + } else { + alertTemplate.setLink(null); + } + return this; + } + public AlertTemplateBuilder setCpFailureRateInterval(String cpFailureRateInterval) { alertTemplate.setCpFailureRateInterval(cpFailureRateInterval); return this; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java index 4063710f62..2b413c760c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java @@ -24,12 +24,12 @@ import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.annotation.AppUpdated; import org.apache.streampark.console.core.entity.ApplicationBackUp; -import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.SparkApplication; +import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.enums.AppExistsStateEnum; import org.apache.streampark.console.core.service.ApplicationBackUpService; -import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.ResourceService; +import org.apache.streampark.console.core.service.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; @@ -70,7 +70,7 @@ public class SparkApplicationController { private ApplicationBackUpService backUpService; @Autowired - private ApplicationLogService applicationLogService; + private SparkApplicationLogService applicationLogService; @Autowired private ResourceService resourceService; @@ -153,8 +153,8 @@ public RestResponse start(SparkApplication app) { @PostMapping(value = "cancel") @RequiresPermissions("app:cancel") - public RestResponse cancel(SparkApplication app) throws Exception { - applicationActionService.cancel(app); + public RestResponse stop(SparkApplication app) throws Exception { + applicationActionService.stop(app); return RestResponse.success(); } @@ -209,8 +209,8 @@ public RestResponse backups(ApplicationBackUp backUp, RestRequest request) { } @PostMapping("optionlog") - public RestResponse optionlog(ApplicationLog applicationLog, RestRequest request) { - IPage applicationList = applicationLogService.getPage(applicationLog, request); + public RestResponse optionlog(SparkApplicationLog applicationLog, RestRequest request) { + IPage applicationList = applicationLogService.getPage(applicationLog, request); return RestResponse.success(applicationList); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java index 9c4be43e56..c07e2b1602 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java @@ -32,6 +32,7 @@ import org.apache.streampark.console.core.enums.FlinkAppStateEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.enums.ResourceFromEnum; +import org.apache.streampark.console.core.enums.SparkAppStateEnum; import org.apache.streampark.console.core.metrics.flink.JobsOverview; import org.apache.streampark.console.core.utils.YarnQueueLabelExpression; import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates; @@ -285,19 +286,16 @@ public void setYarnQueueByHotParams() { } /** - * Determine if a FlinkAppState requires tracking. + * Determine if a SparkAppState requires tracking. * * @return 1: need to be tracked | 0: no need to be tracked. */ public Boolean shouldTracking() { switch (getStateEnum()) { case ADDED: - case CREATED: case FINISHED: case FAILED: - case CANCELED: - case TERMINATED: - case POS_TERMINATED: + case KILLED: return false; default: return true; @@ -312,15 +310,11 @@ public Boolean shouldTracking() { public boolean isCanBeStart() { switch (getStateEnum()) { case ADDED: - case CREATED: case FAILED: - case CANCELED: case FINISHED: case LOST: - case TERMINATED: case SUCCEEDED: case KILLED: - case POS_TERMINATED: return true; default: return false; @@ -338,8 +332,8 @@ public FlinkDevelopmentMode getDevelopmentMode() { } @JsonIgnore - public FlinkAppStateEnum getStateEnum() { - return FlinkAppStateEnum.of(state); + public SparkAppStateEnum getStateEnum() { + return SparkAppStateEnum.of(state); } @JsonIgnore diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java new file mode 100644 index 0000000000..8c51c92ed5 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; + +@Data +@TableName("t_spark_log") +@Slf4j +public class SparkApplicationLog { + + @TableId(type = IdType.AUTO) + private Long id; + /** appId */ + private Long appId; + /** applicationId */ + private String sparkAppId; + /** tracking url of current spark application */ + private String trackUrl; + /** start status */ + private Boolean success; + /** option name */ + private Integer optionName; + /** option time */ + private Date optionTime; + /** exception at the start */ + private String exception; + /** The user who operates the application */ + private Long userId; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java index 822e32ba6b..9e98476701 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java @@ -26,75 +26,47 @@ public enum SparkAppStateEnum { /** Added new job to database. */ ADDED(0), - /** - * The job has been received by the Dispatcher, and is waiting for the job manager to be created. - */ - INITIALIZING(1), + /** (From Yarn)Application which was just created. */ + NEW(1), - /** Job is newly created, no task has started to run. */ - CREATED(2), + /** (From Yarn)Application which is being saved. */ + NEW_SAVING(2), /** Application which is currently running. */ STARTING(3), - /** Application which is currently running. */ - RESTARTING(4), + /** (From Yarn)Application which has been submitted. */ + SUBMITTED(4), - /** Some tasks are scheduled or running, some may be pending, some may be finished. */ - RUNNING(5), + /** (From Yarn)Application has been accepted by the scheduler. */ + ACCEPTED(5), /** The job has failed and is currently waiting for the cleanup to complete. */ - FAILING(6), - - /** The job has failed with a non-recoverable task failure. */ - FAILED(7), - - /** Job is being cancelled. */ - CANCELLING(8), - - /** Job has been cancelled. */ - CANCELED(9), + RUNNING(6), - /** All the job's tasks have successfully finished. */ - FINISHED(10), + /** (From Yarn)Application which finished successfully. */ + FINISHED(7), - /** - * The job has been suspended which means that it has been stopped but not been removed from a - * potential HA job store. - */ - SUSPENDED(11), - - /** The job is currently reconciling and waits for task execution report to recover state. */ - RECONCILING(12), + /** (From Yarn)Application which failed. */ + FAILED(8), /** Loss of mapping. */ - LOST(13), + LOST(9), /** Mapping. */ - MAPPING(14), + MAPPING(10), /** Other statuses. */ - OTHER(15), + OTHER(11), /** Has rollback. */ - REVOKED(16), - - /** - * Lost track of Spark job temporarily. A complete loss of Spark job tracking translates into LOST - * state. - */ - @Deprecated - SILENT(17), - - /** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED. */ - TERMINATED(18), + REVOKED(12), - /** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED. */ - @Deprecated - POS_TERMINATED(19), + /** Spark job has being cancelling(killing) by streampark */ + STOPPING(13), /** Job SUCCEEDED on yarn. */ - SUCCEEDED(20), + SUCCEEDED(14), /** Has killed in Yarn. */ KILLED(-9); @@ -125,13 +97,11 @@ public static SparkAppStateEnum of(String name) { public static boolean isEndState(Integer appState) { SparkAppStateEnum sparkAppStateEnum = SparkAppStateEnum.of(appState); - return SparkAppStateEnum.CANCELED == sparkAppStateEnum - || SparkAppStateEnum.FAILED == sparkAppStateEnum + return SparkAppStateEnum.FAILED == sparkAppStateEnum || SparkAppStateEnum.KILLED == sparkAppStateEnum || SparkAppStateEnum.FINISHED == sparkAppStateEnum || SparkAppStateEnum.SUCCEEDED == sparkAppStateEnum - || SparkAppStateEnum.LOST == sparkAppStateEnum - || SparkAppStateEnum.TERMINATED == sparkAppStateEnum; + || SparkAppStateEnum.LOST == sparkAppStateEnum; } public static boolean isLost(Integer appState) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java new file mode 100644 index 0000000000..4d9ee1ae2b --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.enums; + +import lombok.Getter; + +import java.util.Arrays; + +/** Spark Operation type */ +@Getter +public enum SparkOperationEnum { + + RELEASE(0), START(1), STOP(2); + + private final int value; + + SparkOperationEnum(int value) { + this.value = value; + } + + public static SparkOperationEnum of(Integer option) { + return Arrays.stream(values()).filter((x) -> x.value == option).findFirst().orElse(null); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java new file mode 100644 index 0000000000..b3a3e03e89 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.enums; + +import lombok.Getter; + +import java.util.Arrays; + +/** Option status */ +@Getter +public enum SparkOptionStateEnum { + + /** Application which is currently action: none. */ + NONE(0), + /** Application which is currently action: releasing. */ + RELEASING(1), + /** Application which is currently action: starting. */ + STARTING(2), + /** Application which is currently action: stopping. */ + STOPPING(3); + + private final int value; + + SparkOptionStateEnum(int value) { + this.value = value; + } + + public static SparkOptionStateEnum of(Integer state) { + return Arrays.stream(values()).filter((x) -> x.value == state).findFirst().orElse(null); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java new file mode 100644 index 0000000000..c9e77768a5 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.mapper; + +import org.apache.streampark.console.core.entity.SparkApplicationLog; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface SparkApplicationLogMapper extends BaseMapper { +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java new file mode 100644 index 0000000000..be28f66316 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.metrics.spark; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +import scala.collection.Map; + +@Data +public class Job implements Serializable { + + @JsonProperty("jobId") + private Long id; + + private String name; + + private String submissionTime; + + private String completionTime; + + private List stageIds; + + private String status; + + private Integer numTasks; + + private Integer numActiveTasks; + + private Integer numCompletedTasks; + + private Integer numSkippedTasks; + + private Integer numFailedTasks; + + private Integer numKilledTasks; + + private Integer numCompletedIndices; + + private Integer numActiveStages; + + private Integer numCompletedStages; + + private Integer numSkippedStages; + + private Integer numFailedStages; + + private Map killedTasksSummary; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java new file mode 100644 index 0000000000..12e0c5e220 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.metrics.spark; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class SparkExecutor implements Serializable { + + private Long memoryUsed; + + private Long maxMemory; + + private Long totalCores; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java new file mode 100644 index 0000000000..6edee09ac8 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.core.entity.SparkApplicationLog; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.service.IService; + +/** This interface is used to record spark application operation logs */ +public interface SparkApplicationLogService extends IService { + + /** + * Retrieves a page of {@link SparkApplicationLog} objects based on the provided parameters. + * + * @param sparkApplicationLog The {@link SparkApplicationLog} object containing the search criteria. + * @param request The {@link RestRequest} object used for pagination and sorting. + * @return An {@link IPage} containing the retrieved {@link SparkApplicationLog} objects. + */ + IPage getPage(SparkApplicationLog sparkApplicationLog, RestRequest request); + + /** + * remove application log by application id + * + * @param appId The id of the application to be removed + */ + void removeByAppId(Long appId); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java index 3a1c086ca1..682717cbec 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java @@ -54,12 +54,12 @@ public interface SparkApplicationActionService extends IService */ boolean existsBySparkEnvId(Long sparkEnvId); - /** - * Checks if a job is running for a given cluster ID. - * - * @param clusterId The ID of the cluster. - * @return true if a job is running for the given cluster ID; otherwise, false. - */ - boolean existsRunningByClusterId(Long clusterId); - - /** - * Checks if there is a job that is associated with the given cluster ID. - * - * @param clusterId The ID of the cluster. - * @return True if a job exists for the given cluster ID, false otherwise. - */ - boolean existsByClusterId(Long clusterId); - - /** - * Counts the number of items associated with the given cluster ID. - * - * @param clusterId The ID of the cluster. - * @return The number of items associated with the given cluster ID. - */ - Integer countByClusterId(Long clusterId); - - /** - * Counts the number of items associated with the given cluster ID and database type. - * - * @param clusterId The ID of the cluster. - * @param dbType The type of the database. - * @return The number of items associated with the given cluster ID and database type. - */ - Integer countAffectedByClusterId(Long clusterId, String dbType); - /** * Gets the YARN name for the given application. * diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index 46e447c75b..4ddb22c517 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -21,9 +21,7 @@ import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; -import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkDevelopmentMode; -import org.apache.streampark.common.enums.ResolveOrder; import org.apache.streampark.common.enums.SparkExecutionMode; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.AssertUtils; @@ -31,52 +29,46 @@ import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApplicationException; import org.apache.streampark.console.core.entity.AppBuildPipeline; import org.apache.streampark.console.core.entity.ApplicationConfig; -import org.apache.streampark.console.core.entity.ApplicationLog; -import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.FlinkSql; import org.apache.streampark.console.core.entity.Resource; -import org.apache.streampark.console.core.entity.SavePoint; import org.apache.streampark.console.core.entity.SparkApplication; +import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.entity.SparkEnv; -import org.apache.streampark.console.core.enums.CheckPointTypeEnum; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; -import org.apache.streampark.console.core.enums.FlinkAppStateEnum; -import org.apache.streampark.console.core.enums.OperationEnum; -import org.apache.streampark.console.core.enums.OptionStateEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; +import org.apache.streampark.console.core.enums.SparkAppStateEnum; +import org.apache.streampark.console.core.enums.SparkOperationEnum; +import org.apache.streampark.console.core.enums.SparkOptionStateEnum; import org.apache.streampark.console.core.mapper.SparkApplicationMapper; import org.apache.streampark.console.core.service.AppBuildPipeService; import org.apache.streampark.console.core.service.ApplicationConfigService; -import org.apache.streampark.console.core.service.ApplicationLogService; -import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.ResourceService; import org.apache.streampark.console.core.service.ServiceHelper; +import org.apache.streampark.console.core.service.SparkApplicationLogService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; -import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; -import org.apache.streampark.console.core.watcher.FlinkClusterWatcher; +import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher; import org.apache.streampark.flink.packer.pipeline.BuildResult; import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse; import org.apache.streampark.spark.client.SparkClient; -import org.apache.streampark.spark.client.bean.CancelRequest; -import org.apache.streampark.spark.client.bean.CancelResponse; +import org.apache.streampark.spark.client.bean.StopRequest; +import org.apache.streampark.spark.client.bean.StopResponse; import org.apache.streampark.spark.client.bean.SubmitRequest; import org.apache.streampark.spark.client.bean.SubmitResponse; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -91,7 +83,6 @@ import org.springframework.stereotype.Service; import java.io.File; -import java.net.URI; import java.util.Date; import java.util.EnumSet; import java.util.HashMap; @@ -104,6 +95,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import static org.apache.hadoop.service.Service.STATE.STARTED; + @Slf4j @Service public class SparkApplicationActionServiceImpl @@ -123,7 +116,7 @@ public class SparkApplicationActionServiceImpl private ApplicationConfigService configService; @Autowired - private ApplicationLogService applicationLogService; + private SparkApplicationLogService applicationLogService; @Autowired private SparkEnvService sparkEnvService; @@ -137,21 +130,15 @@ public class SparkApplicationActionServiceImpl @Autowired private AppBuildPipeService appBuildPipeService; - @Autowired - private FlinkClusterService flinkClusterService; - @Autowired private VariableService variableService; @Autowired private ResourceService resourceService; - @Autowired - private FlinkClusterWatcher flinkClusterWatcher; - private final Map> startFutureMap = new ConcurrentHashMap<>(); - private final Map> cancelFutureMap = new ConcurrentHashMap<>(); + private final Map> stopFutureMap = new ConcurrentHashMap<>(); @Override public void revoke(Long appId) throws ApplicationException { @@ -174,104 +161,75 @@ public void revoke(Long appId) throws ApplicationException { updateWrapper.set(SparkApplication::getRelease, ReleaseStateEnum.NEED_RELEASE.get()); } if (!application.isRunning()) { - updateWrapper.set(SparkApplication::getState, FlinkAppStateEnum.REVOKED.getValue()); + updateWrapper.set(SparkApplication::getState, SparkAppStateEnum.REVOKED.getValue()); } baseMapper.update(null, updateWrapper); } @Override public void restart(SparkApplication appParam) throws Exception { - this.cancel(appParam); + this.stop(appParam); this.start(appParam, false); } @Override public void forcedStop(Long id) { CompletableFuture startFuture = startFutureMap.remove(id); - CompletableFuture cancelFuture = cancelFutureMap.remove(id); + CompletableFuture stopFuture = stopFutureMap.remove(id); SparkApplication application = this.baseMapper.selectApp(id); if (startFuture != null) { startFuture.cancel(true); } - if (cancelFuture != null) { - cancelFuture.cancel(true); + if (stopFuture != null) { + stopFuture.cancel(true); } - if (startFuture == null && cancelFuture == null) { + if (startFuture == null && stopFuture == null) { this.doStopped(id); } } @Override - public void cancel(SparkApplication appParam) throws Exception { - FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.CANCELLING); + public void stop(SparkApplication appParam) throws Exception { + SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STOPPING); SparkApplication application = getById(appParam.getId()); - application.setState(FlinkAppStateEnum.CANCELLING.getValue()); + application.setState(SparkAppStateEnum.STOPPING.getValue()); - ApplicationLog applicationLog = new ApplicationLog(); - applicationLog.setOptionName(OperationEnum.CANCEL.getValue()); + SparkApplicationLog applicationLog = new SparkApplicationLog(); + applicationLog.setOptionName(SparkOperationEnum.STOP.getValue()); applicationLog.setAppId(application.getId()); - applicationLog.setJobManagerUrl(application.getJobManagerUrl()); + applicationLog.setTrackUrl(application.getJobManagerUrl()); applicationLog.setOptionTime(new Date()); - applicationLog.setYarnAppId(application.getClusterId()); + applicationLog.setSparkAppId(application.getJobId()); applicationLog.setUserId(serviceHelper.getUserId()); - - if (appParam.getSavePointed()) { - FlinkAppHttpWatcher.addSavepoint(application.getId()); - application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue()); - } else { - application.setOptionState(OptionStateEnum.CANCELLING.getValue()); - } - application.setOptionTime(new Date()); this.baseMapper.updateById(application); Long userId = serviceHelper.getUserId(); if (!application.getUserId().equals(userId)) { - FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId); + SparkAppHttpWatcher.addCanceledApp(application.getId(), userId); } SparkEnv sparkEnv = sparkEnvService.getById(application.getVersionId()); - String clusterId = null; - if (SparkExecutionMode.isYarnMode(application.getExecutionMode())) { - clusterId = application.getAppId(); - } - Map properties = new HashMap<>(); - if (SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) { - FlinkCluster cluster = flinkClusterService.getById(application.getSparkClusterId()); - ApiAlertException.throwIfNull( - cluster, - String.format( - "The clusterId=%s cannot be find, maybe the clusterId is wrong or " - + "the cluster has been deleted. Please contact the Admin.", - application.getSparkClusterId())); - URI activeAddress = cluster.getRemoteURI(); - properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost()); - properties.put(RestOptions.PORT.key(), activeAddress.getPort()); - } - - CancelRequest cancelRequest = new CancelRequest( - application.getId(), - sparkEnv.getSparkVersion(), - SparkExecutionMode.of(application.getExecutionMode()), - properties, - clusterId, - application.getJobId(), - appParam.getDrain(), - appParam.getNativeFormat()); - - final Date triggerTime = new Date(); - CompletableFuture cancelFuture = CompletableFuture - .supplyAsync(() -> SparkClient.cancel(cancelRequest), executorService); - - cancelFutureMap.put(application.getId(), cancelFuture); - - cancelFuture.whenComplete( + StopRequest stopRequest = + new StopRequest( + application.getId(), + sparkEnv.getSparkVersion(), + SparkExecutionMode.of(application.getExecutionMode()), + properties, + application.getJobId(), + appParam.getDrain(), + appParam.getNativeFormat()); + + CompletableFuture stopFuture = + CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest), executorService); + + stopFutureMap.put(application.getId(), stopFuture); + stopFuture.whenComplete( (cancelResponse, throwable) -> { - cancelFutureMap.remove(application.getId()); - + stopFutureMap.remove(application.getId()); if (throwable != null) { String exception = ExceptionUtils.stringifyException(throwable); applicationLog.setException(exception); @@ -281,31 +239,17 @@ public void cancel(SparkApplication appParam) throws Exception { if (throwable instanceof CancellationException) { doStopped(application.getId()); } else { - log.error("stop flink job failed.", throwable); - application.setOptionState(OptionStateEnum.NONE.getValue()); - application.setState(FlinkAppStateEnum.FAILED.getValue()); + log.error("stop spark job failed.", throwable); + application.setOptionState(SparkOptionStateEnum.NONE.getValue()); + application.setState(SparkAppStateEnum.FAILED.getValue()); updateById(application); - - FlinkAppHttpWatcher.unWatching(application.getId()); + SparkAppHttpWatcher.unWatching(application.getId()); } return; } - applicationLog.setSuccess(true); // save log... applicationLogService.save(applicationLog); - - if (cancelResponse != null && cancelResponse.savePointDir() != null) { - String savePointDir = cancelResponse.savePointDir(); - log.info("savePoint path: {}", savePointDir); - SavePoint savePoint = new SavePoint(); - savePoint.setPath(savePointDir); - savePoint.setAppId(application.getId()); - savePoint.setLatest(true); - savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get()); - savePoint.setCreateTime(new Date()); - savePoint.setTriggerTime(triggerTime); - } }); } @@ -317,24 +261,16 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { ApiAlertException.throwIfTrue( !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly."); - if (SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) { - checkBeforeStart(application); - } + SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId()); + ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found spark version"); if (SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) { - - ApiAlertException.throwIfTrue( - !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(), - "[StreamPark] The same task name is already running in the yarn queue"); + checkYarnBeforeStart(application); } AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); AssertUtils.notNull(buildPipeline); - SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId()); - - ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found flink version"); - // if manually started, clear the restart flag if (!auto) { application.setRestartCount(0); @@ -342,7 +278,6 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { if (!application.isNeedRestartOnFailed()) { return; } - appParam.setSavePointed(true); application.setRestartCount(application.getRestartCount() + 1); } @@ -350,8 +285,8 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { starting(application); String jobId = new JobID().toHexString(); - ApplicationLog applicationLog = new ApplicationLog(); - applicationLog.setOptionName(OperationEnum.START.getValue()); + SparkApplicationLog applicationLog = new SparkApplicationLog(); + applicationLog.setOptionName(SparkOperationEnum.START.getValue()); applicationLog.setAppId(application.getId()); applicationLog.setOptionTime(new Date()); applicationLog.setUserId(serviceHelper.getUserId()); @@ -376,6 +311,7 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { if (SparkExecutionMode.YARN_CLUSTER == application.getSparkExecutionMode() || SparkExecutionMode.YARN_CLIENT == application.getSparkExecutionMode()) { buildResult = new ShadedBuildResponse(null, flinkUserJar, true); + application.setJobManagerUrl(YarnUtils.getRMWebAppURL(true)); } // Get the args after placeholder replacement @@ -401,7 +337,6 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { .supplyAsync(() -> SparkClient.submit(submitRequest), executorService); startFutureMap.put(application.getId(), future); - future.whenComplete( (response, throwable) -> { // 1) remove Future @@ -417,16 +352,17 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { doStopped(application.getId()); } else { SparkApplication app = getById(appParam.getId()); - app.setState(FlinkAppStateEnum.FAILED.getValue()); - app.setOptionState(OptionStateEnum.NONE.getValue()); + app.setState(SparkAppStateEnum.FAILED.getValue()); + app.setOptionState(SparkOptionStateEnum.NONE.getValue()); updateById(app); - FlinkAppHttpWatcher.unWatching(appParam.getId()); + SparkAppHttpWatcher.unWatching(appParam.getId()); } return; } // 3) success applicationLog.setSuccess(true); + // TODO:修改为spark对应的参数 if (response.sparkConfig() != null) { String jmMemory = response.sparkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); if (jmMemory != null) { @@ -438,21 +374,21 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { } } application.setAppId(response.clusterId()); - if (StringUtils.isNoneEmpty(response.jobId())) { - application.setJobId(response.jobId()); + if (StringUtils.isNoneEmpty(response.sparkAppId())) { + application.setJobId(response.sparkAppId()); } if (StringUtils.isNoneEmpty(response.jobManagerUrl())) { application.setJobManagerUrl(response.jobManagerUrl()); - applicationLog.setJobManagerUrl(response.jobManagerUrl()); + applicationLog.setTrackUrl(response.jobManagerUrl()); } - applicationLog.setYarnAppId(response.clusterId()); + applicationLog.setSparkAppId(response.sparkAppId()); application.setStartTime(new Date()); application.setEndTime(null); // if start completed, will be added task to tracking queue - FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING); - // FlinkAppHttpWatcher.doWatching(application); + SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STARTING); + SparkAppHttpWatcher.doWatching(application); // update app updateById(application); @@ -487,7 +423,7 @@ private boolean checkAppRepeatInYarn(String jobName) { } private void starting(SparkApplication application) { - application.setState(FlinkAppStateEnum.STARTING.getValue()); + application.setState(SparkAppStateEnum.STARTING.getValue()); application.setOptionTime(new Date()); updateById(application); } @@ -508,8 +444,7 @@ private Tuple2 getUserJarAndAppConf( FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); AssertUtils.notNull(flinkSql); // 1) dist_userJar - // todo - String sqlDistJar = serviceHelper.getFlinkSqlClientJar(null); + String sqlDistJar = serviceHelper.getSparkSqlClientJar(sparkEnv); // 2) appConfig appConf = applicationConfig == null ? null @@ -598,18 +533,7 @@ private Tuple2 getUserJarAndAppConf( private Map getProperties(SparkApplication application) { Map properties = new HashMap<>(application.getOptionMap()); - if (SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) { - FlinkCluster cluster = flinkClusterService.getById(application.getSparkClusterId()); - ApiAlertException.throwIfNull( - cluster, - String.format( - "The clusterId=%s can't be find, maybe the clusterId is wrong or " - + "the cluster has been deleted. Please contact the Admin.", - application.getSparkClusterId())); - URI activeAddress = cluster.getRemoteURI(); - properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost()); - properties.put(RestOptions.PORT.key(), activeAddress.getPort()); - } else if (SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) { + if (SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) { String yarnQueue = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE()); String yarnLabelExpr = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()); Optional.ofNullable(yarnQueue) @@ -618,29 +542,19 @@ private Map getProperties(SparkApplication application) { .ifPresent(yLabel -> properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel)); } - if (application.getAllowNonRestored()) { - properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true); - } - Map dynamicProperties = PropertiesUtils .extractDynamicPropertiesAsJava(application.getDynamicProperties()); properties.putAll(dynamicProperties); - ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder()); - if (resolveOrder != null) { - properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), resolveOrder.getName()); - } - return properties; } private void doStopped(Long id) { SparkApplication application = getById(id); - application.setOptionState(OptionStateEnum.NONE.getValue()); - application.setState(FlinkAppStateEnum.CANCELED.getValue()); + application.setOptionState(SparkOptionStateEnum.NONE.getValue()); + application.setState(SparkAppStateEnum.KILLED.getValue()); application.setOptionTime(new Date()); updateById(application); - // re-tracking flink job on kubernetes and logging exception - FlinkAppHttpWatcher.unWatching(application.getId()); + SparkAppHttpWatcher.unWatching(application.getId()); // kill application if (SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) { try { @@ -655,18 +569,14 @@ private void doStopped(Long id) { } } - /* check flink cluster before job start job */ - private void checkBeforeStart(SparkApplication application) { - SparkEnv sparkEnv = sparkEnvService.getByAppId(application.getId()); - ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found flink version"); - + /* check yarn cluster before job start job */ + private void checkYarnBeforeStart(SparkApplication application) { + STATE yarnState = HadoopUtils.yarnClient().getServiceState(); ApiAlertException.throwIfFalse( - flinkClusterService.existsByFlinkEnvId(sparkEnv.getId()), - "[StreamPark] The flink cluster don't exist, please check it"); - - FlinkCluster flinkCluster = flinkClusterService.getById(application.getSparkClusterId()); - ApiAlertException.throwIfFalse( - flinkClusterWatcher.getClusterState(flinkCluster) == ClusterState.RUNNING, - "[StreamPark] The flink cluster not running, please start it"); + yarnState == STARTED, + "[StreamPark] The yarn cluster service state is " + yarnState.name() + ", please check it"); + ApiAlertException.throwIfTrue( + !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(), + "[StreamPark] The same task name is already running in the yarn queue"); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java index 58a5bb1259..4d4aba24e3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java @@ -26,27 +26,19 @@ import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.util.YarnUtils; -import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.streampark.console.base.exception.ApplicationException; -import org.apache.streampark.console.core.entity.Application; -import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.entity.SparkEnv; import org.apache.streampark.console.core.enums.AppExistsStateEnum; import org.apache.streampark.console.core.enums.FlinkAppStateEnum; +import org.apache.streampark.console.core.enums.SparkAppStateEnum; import org.apache.streampark.console.core.mapper.SparkApplicationMapper; -import org.apache.streampark.console.core.metrics.flink.JobsOverview; import org.apache.streampark.console.core.runner.EnvInitializer; -import org.apache.streampark.console.core.service.FlinkClusterService; -import org.apache.streampark.console.core.service.SavePointService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; -import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; -import org.apache.streampark.console.core.watcher.FlinkClusterWatcher; +import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher; import org.apache.streampark.flink.core.conf.ParameterCli; -import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; -import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -96,94 +88,16 @@ public class SparkApplicationInfoServiceImpl @Autowired private SparkEnvService sparkEnvService; - @Autowired - private SavePointService savePointService; - @Autowired private EnvInitializer envInitializer; - @Autowired - private FlinkK8sWatcher k8SFlinkTrackMonitor; - - @Autowired - private FlinkClusterService flinkClusterService; - - @Autowired - private FlinkClusterWatcher flinkClusterWatcher; - @Override public Map getDashboardDataMap(Long teamId) { - JobsOverview.Task overview = new JobsOverview.Task(); - Integer totalJmMemory = 0; - Integer totalTmMemory = 0; - Integer totalTm = 0; - Integer totalSlot = 0; - Integer availableSlot = 0; - Integer runningJob = 0; - - // stat metrics from other than kubernetes mode - for (Application app : FlinkAppHttpWatcher.getWatchingApps()) { - if (!teamId.equals(app.getTeamId())) { - continue; - } - if (app.getJmMemory() != null) { - totalJmMemory += app.getJmMemory(); - } - if (app.getTmMemory() != null) { - totalTmMemory += app.getTmMemory() * (app.getTotalTM() == null ? 1 : app.getTotalTM()); - } - if (app.getTotalTM() != null) { - totalTm += app.getTotalTM(); - } - if (app.getTotalSlot() != null) { - totalSlot += app.getTotalSlot(); - } - if (app.getAvailableSlot() != null) { - availableSlot += app.getAvailableSlot(); - } - if (app.getState() == FlinkAppStateEnum.RUNNING.getValue()) { - runningJob++; - } - JobsOverview.Task task = app.getOverview(); - if (task != null) { - overview.setTotal(overview.getTotal() + task.getTotal()); - overview.setCreated(overview.getCreated() + task.getCreated()); - overview.setScheduled(overview.getScheduled() + task.getScheduled()); - overview.setDeploying(overview.getDeploying() + task.getDeploying()); - overview.setRunning(overview.getRunning() + task.getRunning()); - overview.setFinished(overview.getFinished() + task.getFinished()); - overview.setCanceling(overview.getCanceling() + task.getCanceling()); - overview.setCanceled(overview.getCanceled() + task.getCanceled()); - overview.setFailed(overview.getFailed() + task.getFailed()); - overview.setReconciling(overview.getReconciling() + task.getReconciling()); - } - } - - // merge metrics from flink kubernetes cluster - FlinkMetricCV k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString()); - if (k8sMetric != null) { - totalJmMemory += k8sMetric.totalJmMemory(); - totalTmMemory += k8sMetric.totalTmMemory(); - totalTm += k8sMetric.totalTm(); - totalSlot += k8sMetric.totalSlot(); - availableSlot += k8sMetric.availableSlot(); - runningJob += k8sMetric.runningJob(); - overview.setTotal(overview.getTotal() + k8sMetric.totalJob()); - overview.setRunning(overview.getRunning() + k8sMetric.runningJob()); - overview.setFinished(overview.getFinished() + k8sMetric.finishedJob()); - overview.setCanceled(overview.getCanceled() + k8sMetric.cancelledJob()); - overview.setFailed(overview.getFailed() + k8sMetric.failedJob()); - } // result json Map dashboardDataMap = new HashMap<>(8); - dashboardDataMap.put("task", overview); - dashboardDataMap.put("jmMemory", totalJmMemory); - dashboardDataMap.put("tmMemory", totalTmMemory); - dashboardDataMap.put("totalTM", totalTm); - dashboardDataMap.put("availableSlot", availableSlot); - dashboardDataMap.put("totalSlot", totalSlot); - dashboardDataMap.put("runningJob", runningJob); + // TODO: Tasks running metrics for presentation + // dashboardDataMap.put("metrics key", "metrics value"); return dashboardDataMap; } @@ -203,14 +117,6 @@ public boolean checkEnv(SparkApplication appParam) throws ApplicationException { } envInitializer.checkSparkEnv(application.getStorageType(), sparkEnv); envInitializer.storageInitialize(application.getStorageType()); - - if (SparkExecutionMode.REMOTE == application.getSparkExecutionMode()) { - FlinkCluster flinkCluster = flinkClusterService.getById(application.getSparkClusterId()); - boolean conned = flinkClusterWatcher.verifyClusterConnection(flinkCluster); - if (!conned) { - throw new ApiAlertException("the target cluster is unavailable, please check!"); - } - } return true; } catch (Exception e) { log.error(ExceptionUtils.stringifyException(e)); @@ -221,10 +127,10 @@ public boolean checkEnv(SparkApplication appParam) throws ApplicationException { @Override public boolean checkAlter(SparkApplication appParam) { Long appId = appParam.getId(); - if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()) { + if (SparkAppStateEnum.KILLED != appParam.getStateEnum()) { return false; } - long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId); + long cancelUserId = SparkAppHttpWatcher.getCanceledJobUserId(appId); long appUserId = appParam.getUserId(); return cancelUserId != -1 && cancelUserId != appUserId; } @@ -241,37 +147,6 @@ public boolean existsByUserId(Long userId) { new LambdaQueryWrapper().eq(SparkApplication::getUserId, userId)); } - @Override - public boolean existsRunningByClusterId(Long clusterId) { - return baseMapper.existsRunningJobByClusterId(clusterId) - || FlinkAppHttpWatcher.getWatchingApps().stream() - .anyMatch( - application -> clusterId.equals(application.getFlinkClusterId()) - && FlinkAppStateEnum.RUNNING == application - .getStateEnum()); - } - - @Override - public boolean existsByClusterId(Long clusterId) { - return baseMapper.exists( - new LambdaQueryWrapper() - .eq(SparkApplication::getSparkClusterId, clusterId)); - } - - @Override - public Integer countByClusterId(Long clusterId) { - return baseMapper - .selectCount( - new LambdaQueryWrapper() - .eq(SparkApplication::getSparkClusterId, clusterId)) - .intValue(); - } - - @Override - public Integer countAffectedByClusterId(Long clusterId, String dbType) { - return baseMapper.countAffectedByClusterId(clusterId, dbType); - } - @Override public boolean existsBySparkEnvId(Long sparkEnvId) { return baseMapper.exists( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java index 94b9abf8a7..36d63d0375 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java @@ -32,11 +32,11 @@ import org.apache.streampark.console.core.bean.Dependency; import org.apache.streampark.console.core.entity.AppBuildPipeline; import org.apache.streampark.console.core.entity.ApplicationConfig; -import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.FlinkSql; import org.apache.streampark.console.core.entity.Message; import org.apache.streampark.console.core.entity.Resource; import org.apache.streampark.console.core.entity.SparkApplication; +import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.entity.SparkEnv; import org.apache.streampark.console.core.enums.CandidateTypeEnum; import org.apache.streampark.console.core.enums.NoticeTypeEnum; @@ -46,17 +46,17 @@ import org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper; import org.apache.streampark.console.core.service.ApplicationBackUpService; import org.apache.streampark.console.core.service.ApplicationConfigService; -import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.MessageService; import org.apache.streampark.console.core.service.ResourceService; import org.apache.streampark.console.core.service.ServiceHelper; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.SparkAppBuildPipeService; +import org.apache.streampark.console.core.service.SparkApplicationLogService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; -import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; +import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher; import org.apache.streampark.flink.packer.maven.Artifact; import org.apache.streampark.flink.packer.maven.DependencyInfo; import org.apache.streampark.flink.packer.pipeline.BuildPipeline; @@ -129,10 +129,10 @@ public class SparkAppBuildPipeServiceImpl private SparkApplicationInfoService applicationInfoService; @Autowired - private ApplicationLogService applicationLogService; + private SparkApplicationLogService applicationLogService; @Autowired - private FlinkAppHttpWatcher flinkAppHttpWatcher; + private SparkAppHttpWatcher sparkAppHttpWatcher; @Autowired private ApplicationConfigService applicationConfigService; @@ -157,7 +157,7 @@ public boolean buildApplication(@Nonnull Long appId, boolean forceBuild) { checkBuildEnv(appId, forceBuild); SparkApplication app = applicationManageService.getById(appId); - ApplicationLog applicationLog = new ApplicationLog(); + SparkApplicationLog applicationLog = new SparkApplicationLog(); applicationLog.setOptionName(RELEASE.getValue()); applicationLog.setAppId(app.getId()); applicationLog.setOptionTime(new Date()); @@ -202,8 +202,8 @@ public void onStart(PipelineSnapshot snapshot) { app.setRelease(ReleaseStateEnum.RELEASING.get()); applicationManageService.updateRelease(app); - if (flinkAppHttpWatcher.isWatchingApp(app.getId())) { - flinkAppHttpWatcher.init(); + if (sparkAppHttpWatcher.isWatchingApp(app.getId())) { + sparkAppHttpWatcher.init(); } // 1) checkEnv @@ -331,8 +331,8 @@ public void onFinish(PipelineSnapshot snapshot, BuildResult result) { } applicationManageService.updateRelease(app); applicationLogService.save(applicationLog); - if (flinkAppHttpWatcher.isWatchingApp(app.getId())) { - flinkAppHttpWatcher.init(); + if (sparkAppHttpWatcher.isWatchingApp(app.getId())) { + sparkAppHttpWatcher.init(); } } }); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java new file mode 100644 index 0000000000..cb6e338af1 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.mybatis.pager.MybatisPager; +import org.apache.streampark.console.core.entity.SparkApplicationLog; +import org.apache.streampark.console.core.mapper.SparkApplicationLogMapper; +import org.apache.streampark.console.core.service.SparkApplicationLogService; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Service +@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class) +public class SparkApplicationLogServiceImpl extends ServiceImpl + implements + SparkApplicationLogService { + + @Override + public IPage getPage(SparkApplicationLog sparkApplicationLog, RestRequest request) { + request.setSortField("option_time"); + Page page = MybatisPager.getPage(request); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(SparkApplicationLog::getAppId, sparkApplicationLog.getAppId()); + return this.page(page, queryWrapper); + } + + @Override + public void removeByAppId(Long appId) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(SparkApplicationLog::getAppId, appId); + this.remove(queryWrapper); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java new file mode 100644 index 0000000000..c7e447cd22 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.watcher; + +import org.apache.streampark.common.util.YarnUtils; +import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.base.util.Tuple2; +import org.apache.streampark.console.base.util.Tuple3; +import org.apache.streampark.console.core.bean.AlertTemplate; +import org.apache.streampark.console.core.entity.SparkApplication; +import org.apache.streampark.console.core.enums.SparkAppStateEnum; +import org.apache.streampark.console.core.enums.SparkOptionStateEnum; +import org.apache.streampark.console.core.enums.StopFromEnum; +import org.apache.streampark.console.core.metrics.spark.Job; +import org.apache.streampark.console.core.metrics.spark.SparkExecutor; +import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; +import org.apache.streampark.console.core.service.alert.AlertService; +import org.apache.streampark.console.core.service.application.SparkApplicationActionService; +import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; +import org.apache.streampark.console.core.service.application.SparkApplicationManageService; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.hc.core5.util.Timeout; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class SparkAppHttpWatcher { + + @Autowired + private SparkApplicationManageService applicationManageService; + + @Autowired + private SparkApplicationActionService applicationActionService; + + @Autowired + private SparkApplicationInfoService applicationInfoService; + + @Autowired + private AlertService alertService; + + @Qualifier("sparkRestAPIWatchingExecutor") + @Autowired + private Executor executorService; + + // track interval every 5 seconds + public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5); + + // option interval within 10 seconds + private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10); + + private static final Timeout HTTP_TIMEOUT = Timeout.ofSeconds(5); + + /** + * Record the status of the first tracking task, because after the task is started, the overview + * of the task will be obtained during the first tracking + */ + private static final Cache STARTING_CACHE = + Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + + /** tracking task list */ + private static final Map WATCHING_APPS = new ConcurrentHashMap<>(0); + + /** + *
+     * StopFrom: Recording spark application stopped by streampark or stopped by other actions
+     * 
+ */ + private static final Map STOP_FROM_MAP = new ConcurrentHashMap<>(0); + + /** + * Task canceled tracking list, record who cancelled the tracking task Map + */ + private static final Map CANCELLED_JOB_MAP = new ConcurrentHashMap<>(0); + + private static final Map OPTIONING = new ConcurrentHashMap<>(0); + + private Long lastWatchTime = 0L; + + private Long lastOptionTime = 0L; + + private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0"); + + @PostConstruct + public void init() { + WATCHING_APPS.clear(); + List applications = + applicationManageService.list( + new LambdaQueryWrapper() + .eq(SparkApplication::getTracking, 1) + .ne(SparkApplication::getState, SparkAppStateEnum.LOST.getValue())); + applications.forEach( + (app) -> { + WATCHING_APPS.put(app.getId(), app); + STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE); + }); + } + + @PreDestroy + public void doStop() { + log.info( + "[StreamPark][SparkAppHttpWatcher] StreamPark Console will be shutdown, persistent application to database."); + WATCHING_APPS.forEach((k, v) -> applicationManageService.persistMetrics(v)); + } + + /** + * NOTE: The following conditions must be met for execution + * + *

1) Program started or page operated task, such as start/stop, needs to return the + * state immediately. (the frequency of 1 second once, continued 10 seconds (10 times)) + * + *

2) Normal information obtain, once every 5 seconds + */ + @Scheduled(fixedDelay = 1000) + public void start() { + Long timeMillis = System.currentTimeMillis(); + if (lastWatchTime == null + || !OPTIONING.isEmpty() + || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis() + || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) { + lastWatchTime = timeMillis; + WATCHING_APPS.forEach(this::watch); + } + } + + @VisibleForTesting + public @Nullable SparkAppStateEnum tryQuerySparkAppState(@Nonnull Long appId) { + SparkApplication app = WATCHING_APPS.get(appId); + return (app == null || app.getState() == null) ? null : app.getStateEnum(); + } + + private void watch(Long id, SparkApplication application) { + executorService.execute( + () -> { + try { + getStateFromYarn(application); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private StopFromEnum getAppStopFrom(Long appId) { + return STOP_FROM_MAP.getOrDefault(appId, StopFromEnum.NONE); + } + + /** + * Query the job state from yarn and query the resource usage from spark when job state is RUNNING + * + * @param application spark application + */ + private void getStateFromYarn(SparkApplication application) throws Exception { + SparkOptionStateEnum optionStateEnum = OPTIONING.get(application.getId()); + + // query the status from the yarn rest Api + YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); + if (yarnAppInfo == null) { + throw new RuntimeException("[StreamPark][SparkAppHttpWatcher] getStateFromYarn failed!"); + } else { + try { + String state = yarnAppInfo.getApp().getState(); + SparkAppStateEnum sparkAppStateEnum = SparkAppStateEnum.of(state); + if (SparkAppStateEnum.OTHER == sparkAppStateEnum) { + return; + } + if (SparkAppStateEnum.isEndState(sparkAppStateEnum.getValue())) { + log.info( + "[StreamPark][SparkAppHttpWatcher] getStateFromYarn, app {} was ended, jobId is {}, state is {}", + application.getId(), + application.getJobId(), + sparkAppStateEnum); + application.setEndTime(new Date()); + } + if (SparkAppStateEnum.RUNNING == sparkAppStateEnum) { + Tuple3 resourceStatus = getResourceStatus(application); + double memoryUsed = resourceStatus.t1; + double maxMemory = resourceStatus.t2; + double totalCores = resourceStatus.t3; + log.info( + "[StreamPark][SparkAppHttpWatcher] getStateFromYarn, app {} was running, jobId is {}, memoryUsed: {}MB, maxMemory: {}MB, totalCores: {}", + application.getId(), + application.getJobId(), + String.format("%.2f", memoryUsed), + String.format("%.2f", maxMemory), + totalCores); + // TODO: Modify the table structure to persist the results + } + application.setState(sparkAppStateEnum.getValue()); + cleanOptioning(optionStateEnum, application.getId()); + doPersistMetrics(application, false); + if (SparkAppStateEnum.FAILED == sparkAppStateEnum + || SparkAppStateEnum.LOST == sparkAppStateEnum + || applicationInfoService.checkAlter(application)) { + doAlert(application, sparkAppStateEnum); + if (SparkAppStateEnum.FAILED == sparkAppStateEnum) { + applicationActionService.start(application, true); + } + } + } catch (Exception e) { + throw new RuntimeException("[StreamPark][SparkAppHttpWatcher] getStateFromYarn failed!"); + } + } + } + + /** + * Calculate spark task progress from Spark rest api. (proxyed by yarn) Only available when yarn + * application status is RUNNING. + * + * @param application + * @return task progress + * @throws Exception + */ + private double getTasksProgress(SparkApplication application) throws Exception { + Job[] jobs = httpJobsStatus(application); + if (jobs.length == 0) { + return 0.0; + } + Optional> jobsSumOption = + Arrays.stream(jobs) + .map(job -> new Tuple2<>(job.getNumCompletedTasks(), job.getNumTasks())) + .reduce((val1, val2) -> new Tuple2<>(val1.t1 + val2.t1, val1.t2 + val2.t2)); + Tuple2 jobsSum = jobsSumOption.get(); + return jobsSum.t1 * 1.0 / jobsSum.t2; + } + + private Tuple3 getResourceStatus(SparkApplication application) throws Exception { + SparkExecutor[] executors = httpExecutorsStatus(application); + if (executors.length == 0) { + return new Tuple3<>(0.0, 0.0, 0L); + } + SparkExecutor totalExecutor = + Arrays.stream(executors) + .reduce( + (e1, e2) -> { + SparkExecutor temp = new SparkExecutor(); + temp.setMemoryUsed(e1.getMemoryUsed() + e2.getMemoryUsed()); + temp.setMaxMemory(e1.getMaxMemory() + e2.getMaxMemory()); + temp.setTotalCores(e1.getTotalCores() + e2.getTotalCores()); + return temp; + }) + .get(); + return new Tuple3<>( + totalExecutor.getMemoryUsed() * 1.0 / 1024 / 1024, + totalExecutor.getMaxMemory() * 1.0 / 1024 / 1024, + totalExecutor.getTotalCores()); + } + + private void doPersistMetrics(SparkApplication application, boolean stopWatch) { + if (SparkAppStateEnum.isEndState(application.getState())) { + application.setOverview(null); + application.setTotalTM(null); + application.setTotalSlot(null); + application.setTotalTask(null); + application.setAvailableSlot(null); + application.setJmMemory(null); + application.setTmMemory(null); + unWatching(application.getId()); + } else if (stopWatch) { + unWatching(application.getId()); + } else { + WATCHING_APPS.put(application.getId(), application); + } + applicationManageService.persistMetrics(application); + } + + private void cleanOptioning(SparkOptionStateEnum optionStateEnum, Long key) { + if (optionStateEnum != null) { + lastOptionTime = System.currentTimeMillis(); + OPTIONING.remove(key); + } + } + + /** set current option state */ + public static void setOptionState(Long appId, SparkOptionStateEnum state) { + log.info("[StreamPark][SparkAppHttpWatcher] setOptioning"); + OPTIONING.put(appId, state); + if (SparkOptionStateEnum.STOPPING == state) { + STOP_FROM_MAP.put(appId, StopFromEnum.STREAMPARK); + } + } + + public static void doWatching(SparkApplication application) { + log.info( + "[StreamPark][SparkAppHttpWatcher] add app to tracking, appId:{}", application.getId()); + WATCHING_APPS.put(application.getId(), application); + STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE); + } + + public static void unWatching(Long appId) { + log.info("[StreamPark][SparkAppHttpWatcher] stop app, appId:{}", appId); + WATCHING_APPS.remove(appId); + } + + public static void addCanceledApp(Long appId, Long userId) { + log.info( + "[StreamPark][SparkAppHttpWatcher] addCanceledApp app appId:{}, useId:{}", appId, userId); + CANCELLED_JOB_MAP.put(appId, userId); + } + + public static Long getCanceledJobUserId(Long appId) { + return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) : CANCELLED_JOB_MAP.get(appId); + } + + public static Collection getWatchingApps() { + return WATCHING_APPS.values(); + } + + private YarnAppInfo httpYarnAppInfo(SparkApplication application) throws Exception { + String reqURL = "ws/v1/cluster/apps/".concat(application.getJobId()); + return yarnRestRequest(reqURL, YarnAppInfo.class); + } + + private Job[] httpJobsStatus(SparkApplication application) throws Exception { + String format = "proxy/%s/api/v1/applications/%s/jobs"; + String reqURL = String.format(format, application.getJobId(), application.getJobId()); + return yarnRestRequest(reqURL, Job[].class); + } + + private SparkExecutor[] httpExecutorsStatus(SparkApplication application) throws Exception { + // "executor" is used for active executors only. + // "allexecutor" is used for all executors including the dead. + String format = "proxy/%s/api/v1/applications/%s/executors"; + String reqURL = String.format(format, application.getJobId(), application.getJobId()); + return yarnRestRequest(reqURL, SparkExecutor[].class); + } + + private T yarnRestRequest(String url, Class clazz) throws IOException { + String result = YarnUtils.restRequest(url, HTTP_TIMEOUT); + if (null == result) { + return null; + } + return JacksonUtils.read(result, clazz); + } + + public boolean isWatchingApp(Long id) { + return WATCHING_APPS.containsKey(id); + } + + /** + * Describes the alarming behavior under abnormal operation for jobs running in yarn mode. + * + * @param application spark application + * @param appState spark application state + */ + private void doAlert(SparkApplication application, SparkAppStateEnum appState) { + AlertTemplate alertTemplate = AlertTemplate.of(application, appState); + alertService.alert(application.getAlertId(), alertTemplate); + } +} diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala index 32d09b8568..0abc5f4c92 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala @@ -32,15 +32,15 @@ object SparkClient extends Logger { private[this] val SUBMIT_REQUEST = "org.apache.streampark.spark.client.bean.SubmitRequest" -> "submit" - private[this] val CANCEL_REQUEST = - "org.apache.streampark.spark.client.bean.CancelRequest" -> "cancel" + private[this] val STOP_REQUEST = + "org.apache.streampark.spark.client.bean.StopRequest" -> "stop" def submit(submitRequest: SubmitRequest): SubmitResponse = { proxy[SubmitResponse](submitRequest, submitRequest.sparkVersion, SUBMIT_REQUEST) } - def cancel(stopRequest: CancelRequest): CancelResponse = { - proxy[CancelResponse](stopRequest, stopRequest.sparkVersion, CANCEL_REQUEST) + def stop(stopRequest: StopRequest): StopResponse = { + proxy[StopResponse](stopRequest, stopRequest.sparkVersion, STOP_REQUEST) } private[this] def proxy[T: ClassTag]( diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala similarity index 96% rename from streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala rename to streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala index 6b8e1bfa25..4e2ab56bc0 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala @@ -24,12 +24,11 @@ import javax.annotation.Nullable import java.util.{Map => JavaMap} -case class CancelRequest( +case class StopRequest( id: Long, sparkVersion: SparkVersion, executionMode: SparkExecutionMode, @Nullable properties: JavaMap[String, Any], - clusterId: String, jobId: String, withDrain: Boolean, nativeFormat: Boolean) diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala similarity index 94% rename from streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala rename to streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala index d293947daf..c8655d19b4 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala @@ -17,4 +17,4 @@ package org.apache.streampark.spark.client.bean -case class CancelResponse(savePointDir: String) +case class StopResponse(savePointDir: String) diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala index d2582dd53a..5ea75af016 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala @@ -24,5 +24,6 @@ import java.util.{Map => JavaMap} case class SubmitResponse( clusterId: String, sparkConfig: JavaMap[String, String], + var sparkAppId: String, @Nullable jobId: String = "", @Nullable jobManagerUrl: String = "") diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala new file mode 100644 index 0000000000..99e97d3b62 --- /dev/null +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.spark.client.conf + +object SparkConfiguration { + val defaultParameters = Map[String, Any]( + "spark.driver.cores" -> "1", + "spark.driver.memory" -> "1g", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "1g") + +} diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala index 242806fb49..5cdfb90635 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala @@ -49,13 +49,13 @@ object SparkShimsProxy extends Logger { private[this] lazy val SPARK_SHIMS_PREFIX = "streampark-spark-shims_spark" def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = { - val shimsClassLoader = getSParkShimsClassLoader(sparkVersion) + val shimsClassLoader = getSparkShimsClassLoader(sparkVersion) ClassLoaderUtils .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) } def proxy[T](sparkVersion: SparkVersion, func: JavaFunc[ClassLoader, T]): T = { - val shimsClassLoader = getSParkShimsClassLoader(sparkVersion) + val shimsClassLoader = getSparkShimsClassLoader(sparkVersion) ClassLoaderUtils .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) } @@ -134,14 +134,14 @@ object SparkShimsProxy extends Logger { .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) } - private[this] def getSParkShimsClassLoader(sparkVersion: SparkVersion): ClassLoader = { + private[this] def getSparkShimsClassLoader(sparkVersion: SparkVersion): ClassLoader = { logInfo(s"add spark shims urls classloader,spark version: $sparkVersion") SHIMS_CLASS_LOADER_CACHE.getOrElseUpdate( s"${sparkVersion.fullVersion}", { // 1) spark/lib - val libURL = getSparkHomeLib(sparkVersion.sparkHome, "jars", !_.getName.startsWith("log4j")) - val shimsUrls = ListBuffer[URL](libURL: _*) + val libUrl = getSparkHomeLib(sparkVersion.sparkHome, "jars", f => !f.getName.startsWith("log4j") && !f.getName.startsWith("slf4j")) + val shimsUrls = ListBuffer[URL](libUrl: _*) // 2) add all shims jar addShimsUrls( diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala index 55e4fe0810..437bf0ff2b 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala @@ -38,12 +38,12 @@ object SparkClientEndpoint { } } - def cancel(cancelRequest: CancelRequest): CancelResponse = { - clients.get(cancelRequest.executionMode) match { - case Some(client) => client.cancel(cancelRequest) + def stop(stopRequest: StopRequest): StopResponse = { + clients.get(stopRequest.executionMode) match { + case Some(client) => client.stop(stopRequest) case _ => throw new UnsupportedOperationException( - s"Unsupported ${cancelRequest.executionMode} cancel ") + s"Unsupported ${stopRequest.executionMode} stop ") } } diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala index 8f0a00e9a2..9870799c0d 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala @@ -18,17 +18,19 @@ package org.apache.streampark.spark.client.impl import org.apache.streampark.common.conf.Workspace +import org.apache.streampark.common.enums.SparkExecutionMode +import org.apache.streampark.common.util.HadoopUtils import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse import org.apache.streampark.spark.client.`trait`.SparkClientTrait import org.apache.streampark.spark.client.bean._ +import org.apache.streampark.spark.client.conf.SparkConfiguration import org.apache.commons.collections.MapUtils +import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} import java.util.concurrent.{CountDownLatch, Executors, ExecutorService} -import scala.util.control.Breaks.break - /** yarn application mode submit */ object YarnApplicationClient extends SparkClientTrait { @@ -36,7 +38,8 @@ object YarnApplicationClient extends SparkClientTrait { private[this] lazy val workspace = Workspace.remote - override def doCancel(cancelRequest: CancelRequest): CancelResponse = { + override def doStop(stopRequest: StopRequest): StopResponse = { + HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId)) null } @@ -44,11 +47,9 @@ object YarnApplicationClient extends SparkClientTrait { override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = { launch(submitRequest) - null - } - private def launch(submitRequest: SubmitRequest): Unit = { + private def launch(submitRequest: SubmitRequest): SubmitResponse = { val launcher: SparkLauncher = new SparkLauncher() .setSparkHome(submitRequest.sparkVersion.sparkHome) .setAppResource(submitRequest.buildResult @@ -56,69 +57,61 @@ object YarnApplicationClient extends SparkClientTrait { .shadedJarPath) .setMainClass(submitRequest.appMain) .setMaster("yarn") - .setDeployMode("cluster") + .setDeployMode(submitRequest.executionMode match { + case SparkExecutionMode.YARN_CLIENT => "client" + case SparkExecutionMode.YARN_CLUSTER => "cluster" + case _ => + throw new IllegalArgumentException( + "[StreamPark][YarnApplicationClient] Yarn mode only support \"client\" and \"cluster\".") + + }) .setAppName(submitRequest.appName) - .setConf("spark.executor.memory", "5g") - .setConf("spark.executor.cores", "4") - .setConf("spark.num.executors", "1") .setConf( "spark.yarn.jars", submitRequest - .asInstanceOf[SubmitRequest] .hdfsWorkspace .sparkLib + "/*.jar") .setVerbose(true) + import scala.collection.JavaConverters._ + setDynamicProperties(launcher, submitRequest.properties.asScala.toMap) + + // TODO: Adds command line arguments for the application. + // launcher.addAppArgs() + if (MapUtils.isNotEmpty(submitRequest.extraParameter) && submitRequest.extraParameter .containsKey("sql")) { launcher.addAppArgs("--sql", submitRequest.extraParameter.get("sql").toString) } - logger.info("The spark task start") + logger.info("[StreamPark][YarnApplicationClient] The spark task start") + val cdlForApplicationId: CountDownLatch = new CountDownLatch(1) + var sparkAppHandle: SparkAppHandle = null threadPool.execute(new Runnable { override def run(): Unit = { try { val countDownLatch: CountDownLatch = new CountDownLatch(1) - val sparkAppHandle: SparkAppHandle = - launcher.startApplication(new SparkAppHandle.Listener() { - override def stateChanged(handle: SparkAppHandle): Unit = { - if (handle.getAppId != null) { - logInfo( - String.format("%s stateChanged :%s", handle.getAppId, handle.getState.toString)) - } else logger.info("stateChanged :{}", handle.getState.toString) - - if (SparkAppHandle.State.FAILED.toString == handle.getState.toString) { - logger.error("Task run failure stateChanged :{}", handle.getState.toString) + sparkAppHandle = launcher.startApplication(new SparkAppHandle.Listener() { + override def stateChanged(handle: SparkAppHandle): Unit = { + if (handle.getAppId != null) { + if (cdlForApplicationId.getCount != 0) { + cdlForApplicationId.countDown() } + logger.info("{} stateChanged :{}", Array(handle.getAppId, handle.getState.toString)) + } else logger.info("stateChanged :{}", handle.getState.toString) - if (handle.getState.isFinal) countDownLatch.countDown() + if (SparkAppHandle.State.FAILED.toString == handle.getState.toString) { + logger.error("Task run failure stateChanged :{}", handle.getState.toString) } - override def infoChanged(handle: SparkAppHandle): Unit = {} - }) - logger.info( - "The task is executing, current is get application id before,please wait ........") - var applicationId: String = null - while ({ - !(SparkAppHandle.State.RUNNING == sparkAppHandle.getState) - }) { - applicationId = sparkAppHandle.getAppId - if (applicationId != null) { - logInfo( - String.format( - "handle current state is %s, appid is %s", - sparkAppHandle.getState.toString, - applicationId)) - break // todo: break is not supported - + if (handle.getState.isFinal) { + countDownLatch.countDown() + } } - } - logInfo( - String.format( - "handle current state is %s, appid is %s", - sparkAppHandle.getState.toString, - applicationId)) + + override def infoChanged(handle: SparkAppHandle): Unit = {} + }) countDownLatch.await() } catch { case e: Exception => @@ -127,6 +120,23 @@ object YarnApplicationClient extends SparkClientTrait { } }) + cdlForApplicationId.await() + logger.info( + "[StreamPark][YarnApplicationClient] The task is executing, handle current state is {}, appid is {}", + Array(sparkAppHandle.getState.toString, sparkAppHandle.getAppId)) + SubmitResponse(null, null, sparkAppHandle.getAppId) + } + + private def setDynamicProperties(sparkLauncher: SparkLauncher, properties: Map[String, Any]): Unit = { + logger.info("[StreamPark][YarnApplicationClient] Spark launcher start configuration.") + val finalProperties: Map[String, Any] = SparkConfiguration.defaultParameters ++ properties + for ((k, v) <- finalProperties) { + if (k.startsWith("spark.")) { + sparkLauncher.setConf(k, v.toString) + } else { + logger.info("[StreamPark][YarnApplicationClient] \"{}\" doesn't start with \"spark.\". Skip it.", k) + } + } } } diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala index 75d6ea74d5..93f32aad01 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala @@ -28,7 +28,7 @@ trait SparkClientTrait extends Logger { def submit(submitRequest: SubmitRequest): SubmitResponse = { logInfo( s""" - |--------------------------------------- spark job start --------------------------------------- + |--------------------------------------- spark job start ----------------------------------- | userSparkHome : ${submitRequest.sparkVersion.sparkHome} | sparkVersion : ${submitRequest.sparkVersion.version} | appName : ${submitRequest.appName} @@ -57,27 +57,25 @@ trait SparkClientTrait extends Logger { def setConfig(submitRequest: SubmitRequest): Unit @throws[Exception] - def cancel(cancelRequest: CancelRequest): CancelResponse = { + def stop(stopRequest: StopRequest): StopResponse = { logInfo( s""" - |----------------------------------------- spark job cancel -------------------------------- - | userSparkHome : ${cancelRequest.sparkVersion.sparkHome} - | sparkVersion : ${cancelRequest.sparkVersion.version} - | clusterId : ${cancelRequest.clusterId} - | withDrain : ${cancelRequest.withDrain} - | nativeFormat : ${cancelRequest.nativeFormat} - | appId : ${cancelRequest.clusterId} - | jobId : ${cancelRequest.jobId} + |----------------------------------------- spark job stop ---------------------------------- + | userSparkHome : ${stopRequest.sparkVersion.sparkHome} + | sparkVersion : ${stopRequest.sparkVersion.version} + | withDrain : ${stopRequest.withDrain} + | nativeFormat : ${stopRequest.nativeFormat} + | jobId : ${stopRequest.jobId} |------------------------------------------------------------------------------------------- |""".stripMargin) - doCancel(cancelRequest) + doStop(stopRequest) } @throws[Exception] def doSubmit(submitRequest: SubmitRequest): SubmitResponse @throws[Exception] - def doCancel(cancelRequest: CancelRequest): CancelResponse + def doStop(stopRequest: StopRequest): StopResponse }