Skip to content

Commit

Permalink
[Bug] submit flink job bug fixed (#3348)
Browse files Browse the repository at this point in the history
* [Improve] submit flink job on yarn application|perjob mode bug fixed

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Nov 18, 2023
1 parent 4e5e267 commit ad848d1
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ object FileUtils {
s"[StreamPark] FileUtils.exists: file $path is not exist!")
}

def mkdir(dir: File) = {
if (dir.exists && !dir.isDirectory) {
throw new IOException(s"File $dir exists and is not a directory. Unable to create directory.")
} else if (!dir.mkdirs) {
// Double-check that some other thread or process hasn't made
if (!dir.isDirectory) {
throw new IOException(s"Unable to create directory $dir")
}
}
}

def getPathFromEnv(env: String): String = {
val path = System.getenv(env)
require(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,26 +388,21 @@ public boolean cpFailedTrigger() {
public String getDistHome() {
String path =
String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), projectId.toString(), getModule());
log.info("local distHome:{}", path);
log.info("local distHome: {}", path);
return path;
}

@JsonIgnore
public String getDistJar() {
return getDistHome() + "/" + getJar();
}

@JsonIgnore
public String getLocalAppHome() {
String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), id.toString());
log.info("local appHome:{}", path);
log.info("local appHome: {}", path);
return path;
}

@JsonIgnore
public String getRemoteAppHome() {
String path = String.format("%s/%s", Workspace.remote().APP_WORKSPACE(), id.toString());
log.info("remote appHome:{}", path);
log.info("remote appHome: {}", path);
return path;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
Expand Down Expand Up @@ -373,9 +374,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {

private void prepareJars(Application app) throws IOException {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
if (!localUploadDIR.exists()) {
localUploadDIR.mkdirs();
}
FileUtils.mkdir(localUploadDIR);

FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
Expand All @@ -398,91 +397,106 @@ private void prepareJars(Application app) throws IOException {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
switch (resourceFrom) {
case CICD:
String appLib = app.getAppLib();
fsOperator.mkCleanDirs(appLib);
fsOperator.upload(app.getDistJar(), appLib);
break;
case UPLOAD:
// 1). upload jar to local uploadDIR.
File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
File localUploadJar = new File(localUploadDIR, app.getJar());
checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR);

// 2) copy jar to local $app_home/lib
File libJar = new File(app.getLocalAppLib(), app.getJar());
if (!localFS.exists(app.getLocalAppLib())
|| !libJar.exists()
|| !FileUtils.equals(localJar, libJar)) {
localFS.mkCleanDirs(app.getLocalAppLib());
localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib());

File userJar;
if (resourceFrom == ResourceFrom.CICD) {
userJar = getAppDistJar(app);
} else if (resourceFrom == ResourceFrom.UPLOAD) {
userJar = new File(WebUtils.getAppTempDir(), app.getJar());
} else {
throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom);
}
// 2) copy user jar to localUpload DIR
File localUploadJar = new File(localUploadDIR, userJar.getName());
checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR);

// 3) for YARNApplication mode
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
// 1) upload user jar to hdfs workspace
if (!fsOperator.exists(app.getAppHome())) {
fsOperator.mkdirs(app.getAppHome());
}
String pipelineJar = app.getAppHome().concat("/").concat(userJar.getName());
if (!fsOperator.exists(pipelineJar)) {
fsOperator.upload(localUploadJar.getAbsolutePath(), app.getAppHome());
} else {
InputStream inputStream = Files.newInputStream(localUploadJar.toPath());
if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(pipelineJar))) {
fsOperator.upload(localUploadJar.getAbsolutePath(), app.getAppHome());
}
}

// 3) for YARNApplication mode
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
List<File> jars = new ArrayList<>(0);

// 1). user jar
jars.add(libJar);

// 2). jar dependency
app.getMavenDependency()
.getJar()
.forEach(jar -> jars.add(new File(localUploadDIR, jar)));

// 3). pom dependency
if (!app.getMavenDependency().getPom().isEmpty()) {
Set<Artifact> artifacts =
app.getMavenDependency().getPom().stream()
.filter(x -> !new File(localUploadDIR, x.artifactName()).exists())
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier(),
pom.toExclusionString()))
.collect(Collectors.toSet());
Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
jars.addAll(mavenArts);
}
List<File> dependencyJars = new ArrayList<>(0);

// 2). jar dependency
app.getMavenDependency()
.getJar()
.forEach(jar -> dependencyJars.add(new File(localUploadDIR, jar)));

// 3). pom dependency
if (!app.getMavenDependency().getPom().isEmpty()) {
Set<Artifact> artifacts =
app.getMavenDependency().getPom().stream()
.filter(x -> !new File(localUploadDIR, x.artifactName()).exists())
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier(),
pom.toExclusionString()))
.collect(Collectors.toSet());
Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
dependencyJars.addAll(mavenArts);
}

// 4). local uploadDIR to hdfs uploadsDIR
String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
for (File jarFile : jars) {
String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
if (!fsOperator.exists(hdfsUploadPath)) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
} else {
InputStream inputStream = Files.newInputStream(jarFile.toPath());
if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
}
}
// 4). local uploadDIR to hdfs uploadsDIR
String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
for (File jarFile : dependencyJars) {
String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
if (!fsOperator.exists(hdfsUploadPath)) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
} else {
InputStream inputStream = Files.newInputStream(jarFile.toPath());
if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
}
// 5). copy jars to $hdfs_app_home/lib
fsOperator.mkCleanDirs(app.getAppLib());
jars.forEach(
jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib()));
}
break;
default:
throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom);
}
// 5). copy jars to $hdfs_app_home/lib
if (!fsOperator.exists(app.getAppLib())) {
fsOperator.mkdirs(app.getAppLib());
} else {
fsOperator.mkCleanDirs(app.getAppLib());
}
dependencyJars.forEach(
jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib()));
}
}
}

private File getAppDistJar(Application app) {
if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
return new File(app.getDistHome(), app.getModule().concat(".jar"));
}
if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
return new File(app.getDistHome(), app.getJar());
}
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType());
}

/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
switch (app.getDevelopmentMode()) {
case CUSTOM_CODE:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
return String.format("%s/%s", app.getLocalAppLib(), app.getModule().concat(".jar"));
return String.format("%s/%s", localUploadDIR, app.getModule().concat(".jar"));
case APACHE_FLINK:
return String.format("%s/%s", app.getLocalAppLib(), app.getJar());
return String.format("%s/%s", localUploadDIR, app.getJar());
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,15 +1487,10 @@ public void start(Application appParam, boolean auto) throws Exception {
case STREAMPARK_FLINK:
flinkUserJar =
String.format(
"%s/%s", application.getAppLib(), application.getModule().concat(".jar"));
"%s/%s", application.getAppHome(), application.getModule().concat(".jar"));
break;
case APACHE_FLINK:
if (application.getFsOperator().exists(application.getAppLib())) {
flinkUserJar = String.format("%s/%s", application.getAppLib(), application.getJar());
} else {
// compatible with historical version
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
}
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ spring:
username: root
password: streampark
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/streampark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
url: jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ trait FlinkClientTrait extends Logger {
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {

val pgkBuilder = PackagedProgram.newBuilder
val packageProgram = PackagedProgram.newBuilder
.setJarFile(jarFile)
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
Expand All @@ -242,14 +242,7 @@ trait FlinkClientTrait extends Logger {
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*)
// userClassPath...
submitRequest.executionMode match {
case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB =>
pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs)
case _ =>
}

val packageProgram = pgkBuilder.build()
.build()

val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ case class Artifact(
extensions: JavaSet[String] = Collections.emptySet()) {

def filter(artifact: AetherArtifact): Boolean = {
artifact.getGroupId match {
case g if g == groupId =>
artifact.getArtifactId match {
case "*" => true
case a => a == artifactId
}
case _ => false
(artifact.getGroupId, artifact.getArtifactId) match {
case ("*", "*") => true
case (g, "*") => g == this.groupId
case ("*", a) => a == this.artifactId
case (g, a) => g == this.groupId && a == this.artifactId
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.maven.repository.internal.MavenRepositorySystemUtils
import org.codehaus.plexus.logging.{Logger => PlexusLog}
import org.codehaus.plexus.logging.console.ConsoleLogger
import org.eclipse.aether.{RepositorySystem, RepositorySystemSession}
import org.eclipse.aether.artifact.DefaultArtifact
import org.eclipse.aether.artifact.{Artifact, DefaultArtifact}
import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory
import org.eclipse.aether.repository.{LocalRepository, RemoteRepository}
import org.eclipse.aether.resolution.{ArtifactDescriptorRequest, ArtifactRequest}
Expand Down Expand Up @@ -205,23 +205,29 @@ object MavenTool extends Logger {

val (repoSystem, session) = getMavenEndpoint()

val artifacts = mavenArtifacts.map(
e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version))

val exclusions = mavenArtifacts
.flatMap(_.extensions.map(_.split(":")))
.map(a => Artifact(a.head, a.last, null)) ++ excludeArtifact

val remoteRepos = getRemoteRepos()

val exclusionAll = exclusions.exists(e => e.groupId == "*" && e.artifactId == "*")

val artifacts = mavenArtifacts.map(
e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version))

// read relevant artifact descriptor info and excluding items if necessary.
val dependencies = artifacts
.map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, null))
.map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
.flatMap(_.getDependencies)
.filter(_.getScope == "compile")
.filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
.map(_.getArtifact)
val dependencies =
if (exclusionAll) Set.empty[DefaultArtifact]
else {
artifacts
.map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, null))
.map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
.flatMap(_.getDependencies)
.filter(_.getScope == "compile")
.filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
.map(_.getArtifact)
}

val mergedArtifacts = artifacts ++ dependencies

Expand Down
Loading

0 comments on commit ad848d1

Please sign in to comment.