Skip to content

Commit

Permalink
[Improve] yarn application mode bug fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
benjobs committed Nov 17, 2023
1 parent 5db2356 commit e09febe
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ object FileUtils {
s"[StreamPark] FileUtils.exists: file $path is not exist!")
}

def mkdir(dir: File) = {
if (dir.exists && !dir.isDirectory) {
throw new IOException(s"File $dir exists and is not a directory. Unable to create directory.")
} else if (!dir.mkdirs) {
// Double-check that some other thread or process hasn't made
if (!dir.isDirectory) {
throw new IOException(s"Unable to create directory $dir")
}
}
}

def getPathFromEnv(env: String): String = {
val path = System.getenv(env)
require(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {

private void prepareJars(Application app) throws IOException {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
if (!localUploadDIR.exists()) {
localUploadDIR.mkdirs();
}
FileUtils.mkdir(localUploadDIR);

FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
Expand All @@ -399,34 +397,38 @@ private void prepareJars(Application app) throws IOException {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
File localUploadJar = new File(localUploadDIR, app.getJar());
switch (resourceFrom) {
case CICD:
// upload jar to local uploadDIR.
File userJar = getAppDistJar(app);
checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR);
break;
case UPLOAD:
// 1). upload jar to local uploadDIR.
File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR);
break;
default:
throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom);

File userJar;
if (resourceFrom == ResourceFrom.CICD) {
userJar = getAppDistJar(app);
} else if (resourceFrom == ResourceFrom.UPLOAD) {
userJar = new File(WebUtils.getAppTempDir(), app.getJar());
} else {
throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom);
}
// 2) copy user jar to localUpload DIR
File localUploadJar = new File(localUploadDIR, userJar.getName());
checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR);

// 3) for YARNApplication mode
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
// 1) upload user jar to hdfs workspace
String pipelineJar = app.getAppHome().concat("/").concat(userJar.getName());
if (!fsOperator.exists(pipelineJar)) {
fsOperator.upload(localUploadJar.getAbsolutePath(), app.getAppHome());
} else {
InputStream inputStream = Files.newInputStream(localUploadJar.toPath());
if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(pipelineJar))) {
fsOperator.upload(localUploadJar.getAbsolutePath(), app.getAppHome());
}
}

List<File> hdfsUploadJars = new ArrayList<>(0);

// 1). user jar
hdfsUploadJars.add(localUploadJar);
List<File> dependencyJars = new ArrayList<>(0);

// 2). jar dependency
app.getMavenDependency()
.getJar()
.forEach(jar -> hdfsUploadJars.add(new File(localUploadDIR, jar)));
.forEach(jar -> dependencyJars.add(new File(localUploadDIR, jar)));

// 3). pom dependency
if (!app.getMavenDependency().getPom().isEmpty()) {
Expand All @@ -443,12 +445,12 @@ private void prepareJars(Application app) throws IOException {
pom.toExclusionString()))
.collect(Collectors.toSet());
Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
hdfsUploadJars.addAll(mavenArts);
dependencyJars.addAll(mavenArts);
}

// 4). local uploadDIR to hdfs uploadsDIR
String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
for (File jarFile : hdfsUploadJars) {
for (File jarFile : dependencyJars) {
String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
if (!fsOperator.exists(hdfsUploadPath)) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
Expand All @@ -461,7 +463,7 @@ private void prepareJars(Application app) throws IOException {
}
// 5). copy jars to $hdfs_app_home/lib
fsOperator.mkCleanDirs(app.getAppLib());
hdfsUploadJars.forEach(
dependencyJars.forEach(
jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,15 +1487,10 @@ public void start(Application appParam, boolean auto) throws Exception {
case STREAMPARK_FLINK:
flinkUserJar =
String.format(
"%s/%s", application.getAppLib(), application.getModule().concat(".jar"));
"%s/%s", application.getAppHome(), application.getModule().concat(".jar"));
break;
case APACHE_FLINK:
if (application.getFsOperator().exists(application.getAppLib())) {
flinkUserJar = String.format("%s/%s", application.getAppLib(), application.getJar());
} else {
// compatible with historical version
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
}
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
break;
default:
throw new IllegalArgumentException(
Expand Down

0 comments on commit e09febe

Please sign in to comment.