From c9ab619e3358a7f8e718a9d8e68b01c53347575b Mon Sep 17 00:00:00 2001 From: benjobs Date: Wed, 8 Nov 2023 01:40:38 +0800 Subject: [PATCH 1/8] [Bug] yarn application bug fixed --- .../service/impl/AppBuildPipeServiceImpl.java | 123 +++++++++++------- .../service/impl/ApplicationServiceImpl.java | 1 + 2 files changed, 75 insertions(+), 49 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index f6fb7e641f..11295412f0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -19,8 +19,6 @@ import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.Workspace; -import org.apache.streampark.common.enums.ApplicationType; -import org.apache.streampark.common.enums.DevelopmentMode; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.FileUtils; @@ -52,6 +50,7 @@ import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher; import org.apache.streampark.flink.packer.docker.DockerConf; +import org.apache.streampark.flink.packer.maven.MavenTool; import org.apache.streampark.flink.packer.pipeline.BuildPipeline; import org.apache.streampark.flink.packer.pipeline.BuildResult; import org.apache.streampark.flink.packer.pipeline.DockerBuildSnapshot; @@ -87,10 +86,12 @@ import javax.annotation.Nonnull; import java.io.File; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -180,44 +181,7 @@ public void onStart(PipeSnapshot snapshot) { applicationService.checkEnv(app); // 2) some preparatory work - String appUploads = app.getWorkspace().APP_UPLOADS(); - - if (app.isCustomCodeJob()) { - // customCode upload jar to appHome... - FsOperator fsOperator = app.getFsOperator(); - if (app.isCICDJob()) { - String appHome = app.getAppHome(); - fsOperator.mkCleanDirs(appHome); - fsOperator.upload(app.getDistHome(), appHome); - } else { - File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); - // upload jar copy to appHome - String uploadJar = appUploads.concat("/").concat(app.getJar()); - checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads); - if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) { - fsOperator.mkdirs(app.getAppLib()); - fsOperator.copy(uploadJar, app.getAppLib(), false, true); - } - } - } - - if (app.isFlinkSqlJob() || app.isUploadJob()) { - if (!app.getDependencyObject().getJar().isEmpty()) { - String localUploads = Workspace.local().APP_UPLOADS(); - // copy jar to local upload dir - for (String jar : app.getDependencyObject().getJar()) { - File localJar = new File(WebUtils.getAppTempDir(), jar); - File uploadJar = new File(localUploads, jar); - if (!localJar.exists() && !uploadJar.exists()) { - throw new ApiAlertException("Missing file: " + jar + ", please upload again"); - } - if (localJar.exists()) { - checkOrElseUploadJar( - FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploads); - } - } - } - } + prepareJars(app); } @Override @@ -333,11 +297,6 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); String localWorkspace = app.getLocalAppHome().concat("/lib"); - if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE) - && app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) { - yarnProvidedPath = app.getAppHome(); - localWorkspace = app.getLocalAppHome(); - } FlinkYarnApplicationBuildRequest yarnAppRequest = new FlinkYarnApplicationBuildRequest( app.getJobName(), @@ -408,6 +367,72 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { } } + private void prepareJars(Application app) { + File localUploadDIR = new File(Workspace.local().APP_UPLOADS()); + if (!localUploadDIR.exists()) { + localUploadDIR.mkdirs(); + } + + // 1. copy jar to local upload dir + if (app.isFlinkSqlJob() || app.isUploadJob()) { + if (!app.getDependencyObject().getJar().isEmpty()) { + for (String jar : app.getDependencyObject().getJar()) { + File localJar = new File(WebUtils.getAppTempDir(), jar); + File uploadJar = new File(localUploadDIR, jar); + if (!localJar.exists() && !uploadJar.exists()) { + throw new ApiAlertException("Missing file: " + jar + ", please upload again"); + } + if (localJar.exists()) { + checkOrElseUploadJar( + FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploadDIR); + } + } + } + } + + if (app.isCustomCodeJob()) { + // customCode upload jar to appHome... + FsOperator fsOperator = app.getFsOperator(); + if (app.isUploadJob()) { + + // 1). upload jar to local upload. + File uploadJar = new File(localUploadDIR, app.getJar()); + + checkOrElseUploadJar( + FsOperator.lfs(), + new File(WebUtils.getAppTempDir(), app.getJar()), + uploadJar.getAbsolutePath(), + localUploadDIR); + + if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { + List jars = new ArrayList<>(0); + jars.add(uploadJar); + + // 2). jar dependency to local upload + if (!app.getDependencyObject().getJar().isEmpty()) { + for (String jar : app.getDependencyObject().getJar()) { + jars.add(new File(localUploadDIR, jar)); + } + } + + // 3. pom dependency to local upload + if (!app.getDependencyInfo().mavenArts().isEmpty()) { + Set dependJars = + MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()); + jars.addAll(dependJars); + } + fsOperator.mkdirs(app.getAppLib()); + jars.forEach( + jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib(), false, true)); + } + } else { + String appHome = app.getAppHome(); + fsOperator.mkCleanDirs(appHome); + fsOperator.upload(app.getDistHome(), appHome); + } + } + } + /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) { switch (app.getDevelopmentMode()) { @@ -416,7 +441,7 @@ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) { case STREAMPARK_FLINK: return String.format("%s/%s", app.getAppLib(), app.getModule().concat(".jar")); case APACHE_FLINK: - return String.format("%s/%s", WebUtils.getAppTempDir(), app.getJar()); + return String.format("%s/%s", Workspace.local().APP_UPLOADS(), app.getJar()); default: throw new IllegalArgumentException( "[StreamPark] unsupported ApplicationType of custom code: " @@ -487,13 +512,13 @@ public boolean saveEntity(AppBuildPipeline pipe) { } private void checkOrElseUploadJar( - FsOperator fsOperator, File localJar, String targetJar, String targetDir) { + FsOperator fsOperator, File localJar, String targetJar, File targetDir) { if (!fsOperator.exists(targetJar)) { - fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true); + fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); } else { // The file exists to check whether it is consistent, and if it is inconsistent, re-upload it if (!FileUtils.equals(localJar, new File(targetJar))) { - fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true); + fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index e23c125647..99f21fd592 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -819,6 +819,7 @@ public Long copy(Application appParam) { newApp.setJarCheckSum(oldApp.getJarCheckSum()); newApp.setTags(oldApp.getTags()); newApp.setTeamId(oldApp.getTeamId()); + newApp.setDependency(oldApp.getDependency()); boolean saved = save(newApp); if (saved) { From 6d0666b315fb3e1236a4e97c18128bfd7947e53a Mon Sep 17 00:00:00 2001 From: benjobs Date: Wed, 8 Nov 2023 02:01:52 +0800 Subject: [PATCH 2/8] [Bug] yarn application mode bug fixed. --- .../service/impl/AppBuildPipeServiceImpl.java | 2 - .../flink/packer/maven/MavenTool.scala | 5 ++ .../flink/packer/pipeline/BuildRequest.scala | 1 - .../FlinkYarnApplicationBuildPipeline.scala | 46 +++++++------------ 4 files changed, 21 insertions(+), 33 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 11295412f0..41d4513356 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -296,12 +296,10 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { switch (executionMode) { case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); - String localWorkspace = app.getLocalAppHome().concat("/lib"); FlinkYarnApplicationBuildRequest yarnAppRequest = new FlinkYarnApplicationBuildRequest( app.getJobName(), mainClass, - localWorkspace, yarnProvidedPath, app.getDevelopmentMode(), app.getDependencyInfo()); diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala index 5dab91892a..05562a870e 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala @@ -43,8 +43,10 @@ import javax.annotation.{Nonnull, Nullable} import java.io.File import java.util +import java.util.{HashSet, Set => JavaSet} import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} @@ -176,6 +178,9 @@ object MavenTool extends Logger { buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath) } + def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] = resolveArtifacts( + mavenArtifacts).asJava + /** * Resolve the collectoin of artifacts, Artifacts will be download to ConfigConst.MAVEN_LOCAL_DIR * if necessary. notes: Only compile scope dependencies will be resolved. diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala index 62f90935a2..5abc55b924 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala @@ -116,7 +116,6 @@ case class FlinkRemotePerJobBuildRequest( case class FlinkYarnApplicationBuildRequest( appName: String, mainClass: String, - localWorkspace: String, yarnProvidedPath: String, developmentMode: DevelopmentMode, dependencyInfo: DependencyInfo) diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala index 92c1d91999..dcf6ad86f2 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala @@ -19,7 +19,7 @@ package org.apache.streampark.flink.packer.pipeline.impl import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.enums.DevelopmentMode -import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator} +import org.apache.streampark.common.fs.{FsOperator, HdfsOperator} import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.packer.maven.MavenTool import org.apache.streampark.flink.packer.pipeline._ @@ -45,9 +45,7 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques override protected def buildProcess(): SimpleBuildResponse = { execStep(1) { request.developmentMode match { - case DevelopmentMode.FLINK_SQL => - LfsOperator.mkCleanDirs(request.localWorkspace) - HdfsOperator.mkCleanDirs(request.yarnProvidedPath) + case DevelopmentMode.FLINK_SQL => HdfsOperator.mkCleanDirs(request.yarnProvidedPath) case _ => } logInfo(s"recreate building workspace: ${request.yarnProvidedPath}") @@ -64,11 +62,7 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques }.getOrElse(throw getError.exception) execStep(3) { - mavenJars.foreach( - jar => { - uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace) - uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath) - }) + mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)) }.getOrElse(throw getError.exception) SimpleBuildResponse() @@ -82,29 +76,21 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques } if (originFile.isFile) { // check file in upload dir - fsOperator match { - case FsOperator.lfs => - fsOperator.copy(originFile.getAbsolutePath, target) - case FsOperator.hdfs => - val uploadFile = s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}" - if (fsOperator.exists(uploadFile)) { - Utils.using(new FileInputStream(originFile))( - inputStream => { - if (DigestUtils.md5Hex(inputStream) != fsOperator.fileMd5(uploadFile)) { - fsOperator.upload(originFile.getAbsolutePath, uploadFile) - } - }) - } else { - fsOperator.upload(originFile.getAbsolutePath, uploadFile) - } - // copy jar from upload dir to target dir - fsOperator.copy(uploadFile, target) + val uploadFile = s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}" + if (fsOperator.exists(uploadFile)) { + Utils.using(new FileInputStream(originFile))( + inputStream => { + if (DigestUtils.md5Hex(inputStream) != fsOperator.fileMd5(uploadFile)) { + fsOperator.upload(originFile.getAbsolutePath, uploadFile) + } + }) + } else { + fsOperator.upload(originFile.getAbsolutePath, uploadFile) } + // copy jar from upload dir to target dir + fsOperator.copy(uploadFile, target) } else { - fsOperator match { - case FsOperator.hdfs => fsOperator.upload(originFile.getAbsolutePath, target) - case _ => - } + fsOperator.upload(originFile.getAbsolutePath, target) } } From ba65306e9550982422044ccd72c5bcce7dbc2d6f Mon Sep 17 00:00:00 2001 From: benjobs Date: Wed, 8 Nov 2023 23:28:55 +0800 Subject: [PATCH 3/8] [Improve] yarn application mode bug fixed. --- .../core/service/impl/ApplicationServiceImpl.java | 6 +++++- .../flink/client/impl/YarnApplicationClient.scala | 14 ++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 99f21fd592..f86cb26284 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1500,7 +1500,11 @@ public void start(Application appParam, boolean auto) throws Exception { "%s/%s", application.getAppLib(), application.getModule().concat(".jar")); break; case APACHE_FLINK: - flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); + if (application.getFsOperator().exists(application.getAppLib())) { + flinkUserJar = String.format("%s/%s", application.getAppLib(), application.getJar()); + } else { + flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); + } break; default: throw new IllegalArgumentException( diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index 2b981b3054..b84aae3224 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -66,14 +66,12 @@ object YarnApplicationClient extends YarnClientTrait { submitRequest.hdfsWorkspace.appJars, submitRequest.hdfsWorkspace.appPlugins ) - submitRequest.developmentMode match { - case DevelopmentMode.FLINK_SQL => - array += s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}" - val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib" - if (HdfsUtils.exists(jobLib)) { - array += jobLib - } - case _ => + val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib" + if (HdfsUtils.exists(jobLib)) { + array += jobLib + } + if (submitRequest.developmentMode == DevelopmentMode.FLINK_SQL) { + array += s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}" } array.toList } From f5b98d9b48f47742234f85b73b8c063ec0c2d3f3 Mon Sep 17 00:00:00 2001 From: benjobs Date: Thu, 9 Nov 2023 08:35:49 +0800 Subject: [PATCH 4/8] [Bug] upload job to workspace bug fixed --- .../service/impl/AppBuildPipeServiceImpl.java | 17 +++++++---------- .../service/impl/ApplicationServiceImpl.java | 11 +---------- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 41d4513356..6accb452ba 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -381,8 +381,7 @@ private void prepareJars(Application app) { throw new ApiAlertException("Missing file: " + jar + ", please upload again"); } if (localJar.exists()) { - checkOrElseUploadJar( - FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploadDIR); + checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR); } } } @@ -394,13 +393,11 @@ private void prepareJars(Application app) { if (app.isUploadJob()) { // 1). upload jar to local upload. + File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); + File uploadJar = new File(localUploadDIR, app.getJar()); - checkOrElseUploadJar( - FsOperator.lfs(), - new File(WebUtils.getAppTempDir(), app.getJar()), - uploadJar.getAbsolutePath(), - localUploadDIR); + checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR); if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { List jars = new ArrayList<>(0); @@ -510,12 +507,12 @@ public boolean saveEntity(AppBuildPipeline pipe) { } private void checkOrElseUploadJar( - FsOperator fsOperator, File localJar, String targetJar, File targetDir) { - if (!fsOperator.exists(targetJar)) { + FsOperator fsOperator, File localJar, File targetJar, File targetDir) { + if (!fsOperator.exists(targetJar.getAbsolutePath())) { fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); } else { // The file exists to check whether it is consistent, and if it is inconsistent, re-upload it - if (!FileUtils.equals(localJar, new File(targetJar))) { + if (!FileUtils.equals(localJar, targetJar)) { fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index f86cb26284..63e8141017 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -704,16 +704,7 @@ public boolean create(Application appParam) { ApiAlertException.throwIfNull( appParam.getTeamId(), "The teamId can't be null. Create application failed."); - if (appParam.isFlinkSqlJob()) { - appParam.setBuild(true); - } else { - if (appParam.isUploadJob()) { - appParam.setBuild(!appParam.getDependencyObject().isEmpty()); - } else { - appParam.setBuild(false); - } - } - + appParam.setBuild(true); appParam.setUserId(commonService.getUserId()); appParam.setState(FlinkAppState.ADDED.getValue()); appParam.setRelease(ReleaseState.NEED_RELEASE.get()); From f9875775477705a98ea97a83e412eafbc9c6e551 Mon Sep 17 00:00:00 2001 From: benjobs Date: Thu, 9 Nov 2023 10:25:24 +0800 Subject: [PATCH 5/8] [Improve] default hdfs workspace path improvement --- .../conf/streampark-console-config/application.yml | 2 +- .../core/service/impl/AppBuildPipeServiceImpl.java | 11 +++-------- .../src/main/resources/application.yml | 2 +- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/deploy/helm/streampark/conf/streampark-console-config/application.yml b/deploy/helm/streampark/conf/streampark-console-config/application.yml index 309e7e6afe..2cee6fa8a5 100755 --- a/deploy/helm/streampark/conf/streampark-console-config/application.yml +++ b/deploy/helm/streampark/conf/streampark-console-config/application.yml @@ -97,7 +97,7 @@ streampark: # local workspace, used to store source code and build dir etc. workspace: local: /opt/streampark_workspace - remote: hdfs://hdfscluster/streampark # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/ + remote: hdfs:///streampark # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/ # remote docker register namespace for streampark docker: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 6accb452ba..2b4cc92534 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -391,14 +391,10 @@ private void prepareJars(Application app) { // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); if (app.isUploadJob()) { - // 1). upload jar to local upload. File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); - File uploadJar = new File(localUploadDIR, app.getJar()); - checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR); - if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { List jars = new ArrayList<>(0); jars.add(uploadJar); @@ -417,8 +413,7 @@ private void prepareJars(Application app) { jars.addAll(dependJars); } fsOperator.mkdirs(app.getAppLib()); - jars.forEach( - jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib(), false, true)); + jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib())); } } else { String appHome = app.getAppHome(); @@ -509,11 +504,11 @@ public boolean saveEntity(AppBuildPipeline pipe) { private void checkOrElseUploadJar( FsOperator fsOperator, File localJar, File targetJar, File targetDir) { if (!fsOperator.exists(targetJar.getAbsolutePath())) { - fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); + fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath()); } else { // The file exists to check whether it is consistent, and if it is inconsistent, re-upload it if (!FileUtils.equals(localJar, targetJar)) { - fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); + fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath()); } } } diff --git a/streampark-console/streampark-console-service/src/main/resources/application.yml b/streampark-console/streampark-console-service/src/main/resources/application.yml index db59bf85ea..52893833f0 100644 --- a/streampark-console/streampark-console-service/src/main/resources/application.yml +++ b/streampark-console/streampark-console-service/src/main/resources/application.yml @@ -97,7 +97,7 @@ streampark: # local workspace, used to store source code and build dir etc. workspace: local: /opt/streampark_workspace - remote: hdfs://hdfscluster/streampark # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/ + remote: hdfs:///streampark # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/ # remote docker register namespace for streampark docker: From 3cc157edb2e736e87cc17cc74d28413c17e938d0 Mon Sep 17 00:00:00 2001 From: benjobs Date: Thu, 9 Nov 2023 14:01:42 +0800 Subject: [PATCH 6/8] [Improve] flink yarn-app mode improvement --- .../service/impl/AppBuildPipeServiceImpl.java | 25 ++++++++++--------- .../client/impl/YarnApplicationClient.scala | 1 - 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 2b4cc92534..965f21e703 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -91,7 +91,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -390,29 +389,31 @@ private void prepareJars(Application app) { if (app.isCustomCodeJob()) { // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); + if (app.isUploadJob()) { // 1). upload jar to local upload. File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); File uploadJar = new File(localUploadDIR, app.getJar()); checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR); + if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { List jars = new ArrayList<>(0); + + // 1) user jar jars.add(uploadJar); - // 2). jar dependency to local upload - if (!app.getDependencyObject().getJar().isEmpty()) { - for (String jar : app.getDependencyObject().getJar()) { - jars.add(new File(localUploadDIR, jar)); - } - } + // 2). jar dependency + app.getDependencyObject() + .getJar() + .forEach(jar -> jars.add(new File(localUploadDIR, jar))); - // 3. pom dependency to local upload + // 3). pom dependency if (!app.getDependencyInfo().mavenArts().isEmpty()) { - Set dependJars = - MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()); - jars.addAll(dependJars); + jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts())); } - fsOperator.mkdirs(app.getAppLib()); + + fsOperator.mkCleanDirs(app.getAppLib()); + // upload jars to uploadDIR jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib())); } } else { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index b84aae3224..26af8f8ab0 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -62,7 +62,6 @@ object YarnApplicationClient extends YarnClientTrait { val providedLibs = { val array = ListBuffer( submitRequest.hdfsWorkspace.flinkLib, - submitRequest.hdfsWorkspace.flinkPlugins, submitRequest.hdfsWorkspace.appJars, submitRequest.hdfsWorkspace.appPlugins ) From 911ea66a0291b6fe31a932fd64eb175e62152c03 Mon Sep 17 00:00:00 2001 From: benjobs Date: Thu, 9 Nov 2023 18:16:01 +0800 Subject: [PATCH 7/8] [Improve] upload jar minor improvement --- .../console/core/entity/Application.java | 5 ++ .../service/impl/AppBuildPipeServiceImpl.java | 56 ++++++++++++------- .../service/impl/ApplicationServiceImpl.java | 1 + 3 files changed, 43 insertions(+), 19 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 568f7f2a52..5b3aa4db28 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -439,6 +439,11 @@ public String getAppLib() { return getAppHome().concat("/lib"); } + @JsonIgnore + public String getLocalAppLib() { + return getLocalAppHome().concat("/lib"); + } + @JsonIgnore public ApplicationType getApplicationType() { return ApplicationType.of(appType); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 965f21e703..07c66ff05d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -288,7 +288,7 @@ public void onDockerPushProgressChange(DockerPushSnapshot snapshot) { /** create building pipeline instance */ private BuildPipeline createPipelineInstance(@Nonnull Application app) { FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId()); - String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app); + String userLocalJar = retrieveUserLocalJar(flinkEnv, app); ExecutionMode executionMode = app.getExecutionModeEnum(); String mainClass = app.isCustomCodeJob() ? app.getMainClass() : ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS(); @@ -312,7 +312,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { app.getJobName(), app.getLocalAppHome(), mainClass, - flinkUserJar, + userLocalJar, app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), @@ -325,7 +325,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { app.getJobName(), app.getLocalAppHome(), mainClass, - flinkUserJar, + userLocalJar, app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), @@ -340,7 +340,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { app.getJobName(), app.getLocalAppHome(), mainClass, - flinkUserJar, + userLocalJar, app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), @@ -370,17 +370,18 @@ private void prepareJars(Application app) { localUploadDIR.mkdirs(); } + FsOperator localFS = FsOperator.lfs(); // 1. copy jar to local upload dir if (app.isFlinkSqlJob() || app.isUploadJob()) { if (!app.getDependencyObject().getJar().isEmpty()) { for (String jar : app.getDependencyObject().getJar()) { File localJar = new File(WebUtils.getAppTempDir(), jar); - File uploadJar = new File(localUploadDIR, jar); - if (!localJar.exists() && !uploadJar.exists()) { + File localUploadJar = new File(localUploadDIR, jar); + if (!localJar.exists() && !localUploadJar.exists()) { throw new ApiAlertException("Missing file: " + jar + ", please upload again"); } if (localJar.exists()) { - checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR); + checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR); } } } @@ -391,16 +392,37 @@ private void prepareJars(Application app) { FsOperator fsOperator = app.getFsOperator(); if (app.isUploadJob()) { - // 1). upload jar to local upload. + // 1). upload jar to local uploadDIR. File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); - File uploadJar = new File(localUploadDIR, app.getJar()); - checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR); + File localUploadJar = new File(localUploadDIR, app.getJar()); + checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR); + + // 2) copy jar to local $app_home/lib + boolean cleanUpload = false; + File libJar = new File(app.getLocalAppLib(), app.getJar()); + if (!localFS.exists(app.getLocalAppLib())) { + cleanUpload = true; + } else { + if (libJar.exists()) { + if (!FileUtils.equals(localJar, libJar)) { + cleanUpload = true; + } + } else { + cleanUpload = true; + } + } + if (cleanUpload) { + localFS.mkCleanDirs(app.getLocalAppLib()); + localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib()); + } + + // 3) for YARNApplication mode if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { List jars = new ArrayList<>(0); // 1) user jar - jars.add(uploadJar); + jars.add(libJar); // 2). jar dependency app.getDependencyObject() @@ -413,7 +435,7 @@ private void prepareJars(Application app) { } fsOperator.mkCleanDirs(app.getAppLib()); - // upload jars to uploadDIR + // 4). upload jars to appLibDIR jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib())); } } else { @@ -425,14 +447,14 @@ private void prepareJars(Application app) { } /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */ - private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) { + private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) { switch (app.getDevelopmentMode()) { case CUSTOM_CODE: switch (app.getApplicationType()) { case STREAMPARK_FLINK: - return String.format("%s/%s", app.getAppLib(), app.getModule().concat(".jar")); + return String.format("%s/%s", app.getLocalAppLib(), app.getModule().concat(".jar")); case APACHE_FLINK: - return String.format("%s/%s", Workspace.local().APP_UPLOADS(), app.getJar()); + return String.format("%s/%s", app.getLocalAppLib(), app.getJar()); default: throw new IllegalArgumentException( "[StreamPark] unsupported ApplicationType of custom code: " @@ -440,10 +462,6 @@ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) { } case FLINK_SQL: String sqlDistJar = commonService.getSqlClientJar(flinkEnv); - if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { - String clientPath = Workspace.remote().APP_CLIENT(); - return String.format("%s/%s", clientPath, sqlDistJar); - } return Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar); default: throw new UnsupportedOperationException( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 63e8141017..e894bd626c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1494,6 +1494,7 @@ public void start(Application appParam, boolean auto) throws Exception { if (application.getFsOperator().exists(application.getAppLib())) { flinkUserJar = String.format("%s/%s", application.getAppLib(), application.getJar()); } else { + // compatible with historical version flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); } break; From 69b2bb799d067f59a227b33f69fd5e1ea49e66c7 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 10 Nov 2023 01:19:16 +0800 Subject: [PATCH 8/8] [Improve] release app error info improvement --- .../ApplicationBuildPipelineController.java | 93 +++++++++---------- 1 file changed, 44 insertions(+), 49 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java index 52aecfcb56..65352306fd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java @@ -95,56 +95,51 @@ public class ApplicationBuildPipelineController { @PermissionAction(id = "#appId", type = PermissionType.APP) @PostMapping(value = "build") @RequiresPermissions("app:create") - public RestResponse buildApplication(Long appId, boolean forceBuild) { - try { - Application app = applicationService.getById(appId); - - // 1) check flink version - FlinkEnv env = flinkEnvService.getById(app.getVersionId()); - boolean checkVersion = env.getFlinkVersion().checkVersion(false); - if (!checkVersion) { - throw new ApiAlertException( - "Unsupported flink version: " + env.getFlinkVersion().version()); - } - - // 2) check env - boolean envOk = applicationService.checkEnv(app); - if (!envOk) { - throw new ApiAlertException( - "Check flink env failed, please check the flink version of this job"); - } - - if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) { - throw new ApiAlertException( - "The job is invalid, or the job cannot be built while it is running"); - } - // check if you need to go through the build process (if the jar and pom have changed, - // you need to go through the build process, if other common parameters are modified, - // you don't need to go through the build process) - - ApplicationLog applicationLog = new ApplicationLog(); - applicationLog.setOptionName( - org.apache.streampark.console.core.enums.Operation.RELEASE.getValue()); - applicationLog.setAppId(app.getId()); - applicationLog.setOptionTime(new Date()); - - boolean needBuild = applicationService.checkBuildAndUpdate(app); - if (!needBuild) { - applicationLog.setSuccess(true); - applicationLogService.save(applicationLog); - return RestResponse.success(true); - } - - // rollback - if (app.isNeedRollback() && app.isFlinkSqlJob()) { - flinkSqlService.rollback(app); - } - - boolean actionResult = appBuildPipeService.buildApplication(app, applicationLog); - return RestResponse.success(actionResult); - } catch (Exception e) { - return RestResponse.success(false).message(e.getMessage()); + public RestResponse buildApplication(Long appId, boolean forceBuild) throws Exception { + Application app = applicationService.getById(appId); + + // 1) check flink version + FlinkEnv env = flinkEnvService.getById(app.getVersionId()); + boolean checkVersion = env.getFlinkVersion().checkVersion(false); + if (!checkVersion) { + throw new ApiAlertException("Unsupported flink version: " + env.getFlinkVersion().version()); } + + // 2) check env + boolean envOk = applicationService.checkEnv(app); + if (!envOk) { + throw new ApiAlertException( + "Check flink env failed, please check the flink version of this job"); + } + + if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) { + throw new ApiAlertException( + "The job is invalid, or the job cannot be built while it is running"); + } + // check if you need to go through the build process (if the jar and pom have changed, + // you need to go through the build process, if other common parameters are modified, + // you don't need to go through the build process) + + ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setOptionName( + org.apache.streampark.console.core.enums.Operation.RELEASE.getValue()); + applicationLog.setAppId(app.getId()); + applicationLog.setOptionTime(new Date()); + + boolean needBuild = applicationService.checkBuildAndUpdate(app); + if (!needBuild) { + applicationLog.setSuccess(true); + applicationLogService.save(applicationLog); + return RestResponse.success(true); + } + + // rollback + if (app.isNeedRollback() && app.isFlinkSqlJob()) { + flinkSqlService.rollback(app); + } + + boolean actionResult = appBuildPipeService.buildApplication(app, applicationLog); + return RestResponse.success(actionResult); } /**