diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 2b4cc92534..965f21e703 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -91,7 +91,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -390,29 +389,31 @@ private void prepareJars(Application app) { if (app.isCustomCodeJob()) { // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); + if (app.isUploadJob()) { // 1). upload jar to local upload. File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); File uploadJar = new File(localUploadDIR, app.getJar()); checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR); + if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { List jars = new ArrayList<>(0); + + // 1) user jar jars.add(uploadJar); - // 2). jar dependency to local upload - if (!app.getDependencyObject().getJar().isEmpty()) { - for (String jar : app.getDependencyObject().getJar()) { - jars.add(new File(localUploadDIR, jar)); - } - } + // 2). jar dependency + app.getDependencyObject() + .getJar() + .forEach(jar -> jars.add(new File(localUploadDIR, jar))); - // 3. pom dependency to local upload + // 3). pom dependency if (!app.getDependencyInfo().mavenArts().isEmpty()) { - Set dependJars = - MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()); - jars.addAll(dependJars); + jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts())); } - fsOperator.mkdirs(app.getAppLib()); + + fsOperator.mkCleanDirs(app.getAppLib()); + // upload jars to uploadDIR jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib())); } } else { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index b84aae3224..26af8f8ab0 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -62,7 +62,6 @@ object YarnApplicationClient extends YarnClientTrait { val providedLibs = { val array = ListBuffer( submitRequest.hdfsWorkspace.flinkLib, - submitRequest.hdfsWorkspace.flinkPlugins, submitRequest.hdfsWorkspace.appJars, submitRequest.hdfsWorkspace.appPlugins )