diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index 708943098d..77618fbd43 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -79,6 +79,17 @@ object FileUtils { s"[StreamPark] FileUtils.exists: file $path is not exist!") } + def mkdir(dir: File) = { + if (dir.exists && !dir.isDirectory) { + throw new IOException(s"File $dir exists and is not a directory. Unable to create directory.") + } else if (!dir.mkdirs) { + // Double-check that some other thread or process hasn't made + if (!dir.isDirectory) { + throw new IOException(s"Unable to create directory $dir") + } + } + } + def getPathFromEnv(env: String): String = { val path = System.getenv(env) require( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 12443d2cee..d75fc04fb3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -374,9 +374,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { private void prepareJars(Application app) throws IOException { File localUploadDIR = new File(Workspace.local().APP_UPLOADS()); - if (!localUploadDIR.exists()) { - localUploadDIR.mkdirs(); - } + FileUtils.mkdir(localUploadDIR); FsOperator localFS = FsOperator.lfs(); // 1. copy jar to local upload dir @@ -399,34 +397,38 @@ private void prepareJars(Application app) throws IOException { // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom()); - File localUploadJar = new File(localUploadDIR, app.getJar()); - switch (resourceFrom) { - case CICD: - // upload jar to local uploadDIR. - File userJar = getAppDistJar(app); - checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR); - break; - case UPLOAD: - // 1). upload jar to local uploadDIR. - File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); - checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR); - break; - default: - throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom); + + File userJar; + if (resourceFrom == ResourceFrom.CICD) { + userJar = getAppDistJar(app); + } else if (resourceFrom == ResourceFrom.UPLOAD) { + userJar = new File(WebUtils.getAppTempDir(), app.getJar()); + } else { + throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom); } + // 2) copy user jar to localUpload DIR + File localUploadJar = new File(localUploadDIR, userJar.getName()); + checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR); // 3) for YARNApplication mode if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { + // 1) upload user jar to hdfs workspace + String pipelineJar = app.getAppHome().concat("/").concat(userJar.getName()); + if (!fsOperator.exists(pipelineJar)) { + fsOperator.upload(localUploadJar.getAbsolutePath(), app.getAppHome()); + } else { + InputStream inputStream = Files.newInputStream(localUploadJar.toPath()); + if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(pipelineJar))) { + fsOperator.upload(localUploadJar.getAbsolutePath(), app.getAppHome()); + } + } - List hdfsUploadJars = new ArrayList<>(0); - - // 1). user jar - hdfsUploadJars.add(localUploadJar); + List dependencyJars = new ArrayList<>(0); // 2). jar dependency app.getMavenDependency() .getJar() - .forEach(jar -> hdfsUploadJars.add(new File(localUploadDIR, jar))); + .forEach(jar -> dependencyJars.add(new File(localUploadDIR, jar))); // 3). pom dependency if (!app.getMavenDependency().getPom().isEmpty()) { @@ -443,12 +445,12 @@ private void prepareJars(Application app) throws IOException { pom.toExclusionString())) .collect(Collectors.toSet()); Set mavenArts = MavenTool.resolveArtifactsAsJava(artifacts); - hdfsUploadJars.addAll(mavenArts); + dependencyJars.addAll(mavenArts); } // 4). local uploadDIR to hdfs uploadsDIR String hdfsUploadDIR = Workspace.remote().APP_UPLOADS(); - for (File jarFile : hdfsUploadJars) { + for (File jarFile : dependencyJars) { String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName(); if (!fsOperator.exists(hdfsUploadPath)) { fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); @@ -461,7 +463,7 @@ private void prepareJars(Application app) throws IOException { } // 5). copy jars to $hdfs_app_home/lib fsOperator.mkCleanDirs(app.getAppLib()); - hdfsUploadJars.forEach( + dependencyJars.forEach( jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib())); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 99da52420b..0d1d45785f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1487,15 +1487,10 @@ public void start(Application appParam, boolean auto) throws Exception { case STREAMPARK_FLINK: flinkUserJar = String.format( - "%s/%s", application.getAppLib(), application.getModule().concat(".jar")); + "%s/%s", application.getAppHome(), application.getModule().concat(".jar")); break; case APACHE_FLINK: - if (application.getFsOperator().exists(application.getAppLib())) { - flinkUserJar = String.format("%s/%s", application.getAppLib(), application.getJar()); - } else { - // compatible with historical version - flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); - } + flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); break; default: throw new IllegalArgumentException(