Skip to content

Commit

Permalink
[Bug] submit job bug fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
benjobs committed Nov 17, 2023
1 parent 4e5e267 commit bde59a7
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,26 +388,21 @@ public boolean cpFailedTrigger() {
public String getDistHome() {
String path =
String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), projectId.toString(), getModule());
log.info("local distHome:{}", path);
log.info("local distHome: {}", path);
return path;
}

@JsonIgnore
public String getDistJar() {
return getDistHome() + "/" + getJar();
}

@JsonIgnore
public String getLocalAppHome() {
String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), id.toString());
log.info("local appHome:{}", path);
log.info("local appHome: {}", path);
return path;
}

@JsonIgnore
public String getRemoteAppHome() {
String path = String.format("%s/%s", Workspace.remote().APP_WORKSPACE(), id.toString());
log.info("remote appHome:{}", path);
log.info("remote appHome: {}", path);
return path;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
Expand Down Expand Up @@ -398,91 +399,97 @@ private void prepareJars(Application app) throws IOException {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
File localUploadJar = new File(localUploadDIR, app.getJar());
switch (resourceFrom) {
case CICD:
String appLib = app.getAppLib();
fsOperator.mkCleanDirs(appLib);
fsOperator.upload(app.getDistJar(), appLib);
// upload jar to local uploadDIR.
File userJar = getAppDistJar(app);
checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR);
break;
case UPLOAD:
// 1). upload jar to local uploadDIR.
File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
File localUploadJar = new File(localUploadDIR, app.getJar());
checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR);
break;
default:
throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom);
}

// 2) copy jar to local $app_home/lib
File libJar = new File(app.getLocalAppLib(), app.getJar());
if (!localFS.exists(app.getLocalAppLib())
|| !libJar.exists()
|| !FileUtils.equals(localJar, libJar)) {
localFS.mkCleanDirs(app.getLocalAppLib());
localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib());
}

// 3) for YARNApplication mode
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
List<File> jars = new ArrayList<>(0);

// 1). user jar
jars.add(libJar);

// 2). jar dependency
app.getMavenDependency()
.getJar()
.forEach(jar -> jars.add(new File(localUploadDIR, jar)));

// 3). pom dependency
if (!app.getMavenDependency().getPom().isEmpty()) {
Set<Artifact> artifacts =
app.getMavenDependency().getPom().stream()
.filter(x -> !new File(localUploadDIR, x.artifactName()).exists())
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier(),
pom.toExclusionString()))
.collect(Collectors.toSet());
Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
jars.addAll(mavenArts);
}
// 3) for YARNApplication mode
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {

List<File> hdfsUploadJars = new ArrayList<>(0);

// 1). user jar
hdfsUploadJars.add(localUploadJar);

// 2). jar dependency
app.getMavenDependency()
.getJar()
.forEach(jar -> hdfsUploadJars.add(new File(localUploadDIR, jar)));

// 3). pom dependency
if (!app.getMavenDependency().getPom().isEmpty()) {
Set<Artifact> artifacts =
app.getMavenDependency().getPom().stream()
.filter(x -> !new File(localUploadDIR, x.artifactName()).exists())
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier(),
pom.toExclusionString()))
.collect(Collectors.toSet());
Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
hdfsUploadJars.addAll(mavenArts);
}

// 4). local uploadDIR to hdfs uploadsDIR
String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
for (File jarFile : jars) {
String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
if (!fsOperator.exists(hdfsUploadPath)) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
} else {
InputStream inputStream = Files.newInputStream(jarFile.toPath());
if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
}
}
// 4). local uploadDIR to hdfs uploadsDIR
String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
for (File jarFile : hdfsUploadJars) {
String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
if (!fsOperator.exists(hdfsUploadPath)) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
} else {
InputStream inputStream = Files.newInputStream(jarFile.toPath());
if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
}
// 5). copy jars to $hdfs_app_home/lib
fsOperator.mkCleanDirs(app.getAppLib());
jars.forEach(
jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib()));
}
break;
default:
throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom);
}
// 5). copy jars to $hdfs_app_home/lib
fsOperator.mkCleanDirs(app.getAppLib());
hdfsUploadJars.forEach(
jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib()));
}
}
}

private File getAppDistJar(Application app) {
File userJar;
if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
userJar = new File(app.getDistHome(), app.getModule().concat(".jar"));
} else if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
userJar = new File(app.getDistHome(), app.getJar());
} else {
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType());
}
return userJar;
}

/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
switch (app.getDevelopmentMode()) {
case CUSTOM_CODE:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
return String.format("%s/%s", app.getLocalAppLib(), app.getModule().concat(".jar"));
return String.format("%s/%s", localUploadDIR, app.getModule().concat(".jar"));
case APACHE_FLINK:
return String.format("%s/%s", app.getLocalAppLib(), app.getJar());
return String.format("%s/%s", localUploadDIR, app.getJar());
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
spring:
datasource:
username: root
password: streampark
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/streampark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
url: jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ trait FlinkClientTrait extends Logger {
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {

val pgkBuilder = PackagedProgram.newBuilder
val packageProgram = PackagedProgram.newBuilder
.setJarFile(jarFile)
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
Expand All @@ -242,14 +242,7 @@ trait FlinkClientTrait extends Logger {
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*)
// userClassPath...
submitRequest.executionMode match {
case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB =>
pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs)
case _ =>
}

val packageProgram = pgkBuilder.build()
.build()

val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ case class Artifact(
extensions: JavaSet[String] = Collections.emptySet()) {

def filter(artifact: AetherArtifact): Boolean = {
artifact.getGroupId match {
case g if g == groupId =>
artifact.getArtifactId match {
case "*" => true
case a => a == artifactId
}
case _ => false
(artifact.getGroupId, artifact.getArtifactId) match {
case ("*", "*") => true
case (g, "*") => g == this.groupId
case ("*", a) => a == this.artifactId
case (g, a) => g == this.groupId && a == this.artifactId
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.maven.repository.internal.MavenRepositorySystemUtils
import org.codehaus.plexus.logging.{Logger => PlexusLog}
import org.codehaus.plexus.logging.console.ConsoleLogger
import org.eclipse.aether.{RepositorySystem, RepositorySystemSession}
import org.eclipse.aether.artifact.DefaultArtifact
import org.eclipse.aether.artifact.{Artifact, DefaultArtifact}
import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory
import org.eclipse.aether.repository.{LocalRepository, RemoteRepository}
import org.eclipse.aether.resolution.{ArtifactDescriptorRequest, ArtifactRequest}
Expand Down Expand Up @@ -205,23 +205,29 @@ object MavenTool extends Logger {

val (repoSystem, session) = getMavenEndpoint()

val artifacts = mavenArtifacts.map(
e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version))

val exclusions = mavenArtifacts
.flatMap(_.extensions.map(_.split(":")))
.map(a => Artifact(a.head, a.last, null)) ++ excludeArtifact

val remoteRepos = getRemoteRepos()

val exclusionAll = exclusions.exists(e => e.groupId == "*" && e.artifactId == "*")

val artifacts = mavenArtifacts.map(
e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version))

// read relevant artifact descriptor info and excluding items if necessary.
val dependencies = artifacts
.map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, null))
.map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
.flatMap(_.getDependencies)
.filter(_.getScope == "compile")
.filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
.map(_.getArtifact)
val dependencies =
if (exclusionAll) Set.empty[DefaultArtifact]
else {
artifacts
.map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, null))
.map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
.flatMap(_.getDependencies)
.filter(_.getScope == "compile")
.filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
.map(_.getArtifact)
}

val mergedArtifacts = artifacts ++ dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,22 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques
*/
@throws[Throwable]
override protected def buildProcess(): SimpleBuildResponse = {
execStep(1) {
request.developmentMode match {
case DevelopmentMode.FLINK_SQL => HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
case _ =>
}
logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
}.getOrElse(throw getError.exception)

val mavenJars =
execStep(2) {
request.developmentMode match {
case DevelopmentMode.FLINK_SQL =>
val mavenArts = MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
mavenArts.map(_.getAbsolutePath) ++ request.dependencyInfo.extJarLibs
case _ => Set[String]()
}
if (request.developmentMode == DevelopmentMode.FLINK_SQL) {
execStep(1) {
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
}.getOrElse(throw getError.exception)

execStep(3) {
mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath))
}.getOrElse(throw getError.exception)
val mavenJars =
execStep(2) {
val mavenArts = MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
mavenArts.map(_.getAbsolutePath) ++ request.dependencyInfo.extJarLibs
}.getOrElse(throw getError.exception)

execStep(3) {
mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath))
}.getOrElse(throw getError.exception)
}
SimpleBuildResponse()
}

Expand Down

0 comments on commit bde59a7

Please sign in to comment.