Skip to content

Commit

Permalink
[Fix] submit an err job status alway is STARING
Browse files Browse the repository at this point in the history
  • Loading branch information
monrg committed Jan 9, 2024
1 parent b537c1f commit e1c386f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ create table `t_flink_log` (
`exception` text collate utf8mb4_general_ci,
`option_time` datetime default null,
`option_name` tinyint default null,
`user_id` bigint default null comment 'operator user id',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,13 @@ public void start(Application appParam, boolean auto) throws Exception {
String exception = ExceptionUtils.stringifyException(throwable);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
try {
if (!applicationLogService.save(applicationLog)) {
log.error("save application log failed.");
}
} catch (Exception e) {
log.error("save application log failed.", e);
}
if (throwable instanceof CancellationException) {
doStopped(application);
} else {
Expand Down Expand Up @@ -578,8 +584,7 @@ public void start(Application appParam, boolean auto) throws Exception {
* @return
*/
private boolean checkAppRepeatInYarn(String jobName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
try (YarnClient yarnClient = HadoopUtils.yarnClient()) {
Set<String> types =
Sets.newHashSet(
ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
Expand Down Expand Up @@ -749,8 +754,7 @@ private Map<String, Object> getProperties(Application application) {
}

if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
try {
HadoopUtils.yarnClient();
try (YarnClient yarnClient = HadoopUtils.yarnClient()) {
properties.put(JobManagerOptions.ARCHIVE_DIR.key(), Workspace.ARCHIVES_FILE_PATH());
} catch (Exception e) {
// skip
Expand Down Expand Up @@ -793,8 +797,9 @@ private void doStopped(Application appParam) {
List<ApplicationReport> applications =
applicationInfoService.getYarnAppReport(application.getJobName());
if (!applications.isEmpty()) {
YarnClient yarnClient = HadoopUtils.yarnClient();
yarnClient.killApplication(applications.get(0).getApplicationId());
try (YarnClient yarnClient = HadoopUtils.yarnClient()) {
yarnClient.killApplication(applications.get(0).getApplicationId());
}
}
} catch (Exception ignored) {
}
Expand Down

0 comments on commit e1c386f

Please sign in to comment.