Skip to content

Commit

Permalink
[Improve] flink yarn-app mode improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
benjobs committed Nov 9, 2023
1 parent f987577 commit 3cc157e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -390,29 +389,31 @@ private void prepareJars(Application app) {
if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();

if (app.isUploadJob()) {
// 1). upload jar to local upload.
File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
File uploadJar = new File(localUploadDIR, app.getJar());
checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploadDIR);

if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
List<File> jars = new ArrayList<>(0);

// 1) user jar
jars.add(uploadJar);

// 2). jar dependency to local upload
if (!app.getDependencyObject().getJar().isEmpty()) {
for (String jar : app.getDependencyObject().getJar()) {
jars.add(new File(localUploadDIR, jar));
}
}
// 2). jar dependency
app.getDependencyObject()
.getJar()
.forEach(jar -> jars.add(new File(localUploadDIR, jar)));

// 3. pom dependency to local upload
// 3). pom dependency
if (!app.getDependencyInfo().mavenArts().isEmpty()) {
Set<File> dependJars =
MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts());
jars.addAll(dependJars);
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
}
fsOperator.mkdirs(app.getAppLib());

fsOperator.mkCleanDirs(app.getAppLib());
// upload jars to uploadDIR
jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib()));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ object YarnApplicationClient extends YarnClientTrait {
val providedLibs = {
val array = ListBuffer(
submitRequest.hdfsWorkspace.flinkLib,
submitRequest.hdfsWorkspace.flinkPlugins,
submitRequest.hdfsWorkspace.appJars,
submitRequest.hdfsWorkspace.appPlugins
)
Expand Down

0 comments on commit 3cc157e

Please sign in to comment.