Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] deploy flink job on yarnApplication Mode bug fixed #3325

Merged
merged 8 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ streampark:
# local workspace, used to store source code and build dir etc.
workspace:
local: /opt/streampark_workspace
remote: hdfs://hdfscluster/streampark # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/
remote: hdfs:///streampark # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/

# remote docker register namespace for streampark
docker:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,56 +95,51 @@ public class ApplicationBuildPipelineController {
@PermissionAction(id = "#appId", type = PermissionType.APP)
@PostMapping(value = "build")
@RequiresPermissions("app:create")
public RestResponse buildApplication(Long appId, boolean forceBuild) {
try {
Application app = applicationService.getById(appId);

// 1) check flink version
FlinkEnv env = flinkEnvService.getById(app.getVersionId());
boolean checkVersion = env.getFlinkVersion().checkVersion(false);
if (!checkVersion) {
throw new ApiAlertException(
"Unsupported flink version: " + env.getFlinkVersion().version());
}

// 2) check env
boolean envOk = applicationService.checkEnv(app);
if (!envOk) {
throw new ApiAlertException(
"Check flink env failed, please check the flink version of this job");
}

if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
throw new ApiAlertException(
"The job is invalid, or the job cannot be built while it is running");
}
// check if you need to go through the build process (if the jar and pom have changed,
// you need to go through the build process, if other common parameters are modified,
// you don't need to go through the build process)

ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(
org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
applicationLog.setAppId(app.getId());
applicationLog.setOptionTime(new Date());

boolean needBuild = applicationService.checkBuildAndUpdate(app);
if (!needBuild) {
applicationLog.setSuccess(true);
applicationLogService.save(applicationLog);
return RestResponse.success(true);
}

// rollback
if (app.isNeedRollback() && app.isFlinkSqlJob()) {
flinkSqlService.rollback(app);
}

boolean actionResult = appBuildPipeService.buildApplication(app, applicationLog);
return RestResponse.success(actionResult);
} catch (Exception e) {
return RestResponse.success(false).message(e.getMessage());
public RestResponse buildApplication(Long appId, boolean forceBuild) throws Exception {
Application app = applicationService.getById(appId);

// 1) check flink version
FlinkEnv env = flinkEnvService.getById(app.getVersionId());
boolean checkVersion = env.getFlinkVersion().checkVersion(false);
if (!checkVersion) {
throw new ApiAlertException("Unsupported flink version: " + env.getFlinkVersion().version());
}

// 2) check env
boolean envOk = applicationService.checkEnv(app);
if (!envOk) {
throw new ApiAlertException(
"Check flink env failed, please check the flink version of this job");
}

if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
throw new ApiAlertException(
"The job is invalid, or the job cannot be built while it is running");
}
// check if you need to go through the build process (if the jar and pom have changed,
// you need to go through the build process, if other common parameters are modified,
// you don't need to go through the build process)

ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(
org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
applicationLog.setAppId(app.getId());
applicationLog.setOptionTime(new Date());

boolean needBuild = applicationService.checkBuildAndUpdate(app);
if (!needBuild) {
applicationLog.setSuccess(true);
applicationLogService.save(applicationLog);
return RestResponse.success(true);
}

// rollback
if (app.isNeedRollback() && app.isFlinkSqlJob()) {
flinkSqlService.rollback(app);
}

boolean actionResult = appBuildPipeService.buildApplication(app, applicationLog);
return RestResponse.success(actionResult);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ public String getAppLib() {
return getAppHome().concat("/lib");
}

@JsonIgnore
public String getLocalAppLib() {
return getLocalAppHome().concat("/lib");
}

@JsonIgnore
public ApplicationType getApplicationType() {
return ApplicationType.of(appType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
Expand Down Expand Up @@ -52,6 +50,7 @@
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.packer.docker.DockerConf;
import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.DockerBuildSnapshot;
Expand Down Expand Up @@ -87,6 +86,7 @@
import javax.annotation.Nonnull;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -180,44 +180,7 @@ public void onStart(PipeSnapshot snapshot) {
applicationService.checkEnv(app);

// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();

if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
if (app.isCICDJob()) {
String appHome = app.getAppHome();
fsOperator.mkCleanDirs(appHome);
fsOperator.upload(app.getDistHome(), appHome);
} else {
File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
// upload jar copy to appHome
String uploadJar = appUploads.concat("/").concat(app.getJar());
checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads);
if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
fsOperator.mkdirs(app.getAppLib());
fsOperator.copy(uploadJar, app.getAppLib(), false, true);
}
}
}

if (app.isFlinkSqlJob() || app.isUploadJob()) {
if (!app.getDependencyObject().getJar().isEmpty()) {
String localUploads = Workspace.local().APP_UPLOADS();
// copy jar to local upload dir
for (String jar : app.getDependencyObject().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
File uploadJar = new File(localUploads, jar);
if (!localJar.exists() && !uploadJar.exists()) {
throw new ApiAlertException("Missing file: " + jar + ", please upload again");
}
if (localJar.exists()) {
checkOrElseUploadJar(
FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploads);
}
}
}
}
prepareJars(app);
}

@Override
Expand Down Expand Up @@ -325,24 +288,17 @@ public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
/** create building pipeline instance */
private BuildPipeline createPipelineInstance(@Nonnull Application app) {
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
String userLocalJar = retrieveUserLocalJar(flinkEnv, app);
ExecutionMode executionMode = app.getExecutionModeEnum();
String mainClass =
app.isCustomCodeJob() ? app.getMainClass() : ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
switch (executionMode) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE)
&& app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
}
FlinkYarnApplicationBuildRequest yarnAppRequest =
new FlinkYarnApplicationBuildRequest(
app.getJobName(),
mainClass,
localWorkspace,
yarnProvidedPath,
app.getDevelopmentMode(),
app.getDependencyInfo());
Expand All @@ -356,7 +312,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
app.getJobName(),
app.getLocalAppHome(),
mainClass,
flinkUserJar,
userLocalJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
Expand All @@ -369,7 +325,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
app.getJobName(),
app.getLocalAppHome(),
mainClass,
flinkUserJar,
userLocalJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
Expand All @@ -384,7 +340,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
app.getJobName(),
app.getLocalAppHome(),
mainClass,
flinkUserJar,
userLocalJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
Expand All @@ -408,26 +364,104 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
}
}

private void prepareJars(Application app) {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
if (!localUploadDIR.exists()) {
localUploadDIR.mkdirs();
}

FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
if (app.isFlinkSqlJob() || app.isUploadJob()) {
if (!app.getDependencyObject().getJar().isEmpty()) {
for (String jar : app.getDependencyObject().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
File localUploadJar = new File(localUploadDIR, jar);
if (!localJar.exists() && !localUploadJar.exists()) {
throw new ApiAlertException("Missing file: " + jar + ", please upload again");
}
if (localJar.exists()) {
checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR);
}
}
}
}

if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();

if (app.isUploadJob()) {
// 1). upload jar to local uploadDIR.
File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
File localUploadJar = new File(localUploadDIR, app.getJar());
checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR);

// 2) copy jar to local $app_home/lib
boolean cleanUpload = false;
File libJar = new File(app.getLocalAppLib(), app.getJar());
if (!localFS.exists(app.getLocalAppLib())) {
cleanUpload = true;
} else {
if (libJar.exists()) {
if (!FileUtils.equals(localJar, libJar)) {
cleanUpload = true;
}
} else {
cleanUpload = true;
}
}

if (cleanUpload) {
localFS.mkCleanDirs(app.getLocalAppLib());
localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib());
}

// 3) for YARNApplication mode
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
List<File> jars = new ArrayList<>(0);

// 1) user jar
jars.add(libJar);

// 2). jar dependency
app.getDependencyObject()
.getJar()
.forEach(jar -> jars.add(new File(localUploadDIR, jar)));

// 3). pom dependency
if (!app.getDependencyInfo().mavenArts().isEmpty()) {
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
}

fsOperator.mkCleanDirs(app.getAppLib());
// 4). upload jars to appLibDIR
jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib()));
}
} else {
String appHome = app.getAppHome();
fsOperator.mkCleanDirs(appHome);
fsOperator.upload(app.getDistHome(), appHome);
}
}
}

/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) {
private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) {
switch (app.getDevelopmentMode()) {
case CUSTOM_CODE:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
return String.format("%s/%s", app.getAppLib(), app.getModule().concat(".jar"));
return String.format("%s/%s", app.getLocalAppLib(), app.getModule().concat(".jar"));
case APACHE_FLINK:
return String.format("%s/%s", WebUtils.getAppTempDir(), app.getJar());
return String.format("%s/%s", app.getLocalAppLib(), app.getJar());
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
+ app.getApplicationType());
}
case FLINK_SQL:
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
String clientPath = Workspace.remote().APP_CLIENT();
return String.format("%s/%s", clientPath, sqlDistJar);
}
return Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar);
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -487,13 +521,13 @@ public boolean saveEntity(AppBuildPipeline pipe) {
}

private void checkOrElseUploadJar(
FsOperator fsOperator, File localJar, String targetJar, String targetDir) {
if (!fsOperator.exists(targetJar)) {
fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
FsOperator fsOperator, File localJar, File targetJar, File targetDir) {
if (!fsOperator.exists(targetJar.getAbsolutePath())) {
fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath());
} else {
// The file exists to check whether it is consistent, and if it is inconsistent, re-upload it
if (!FileUtils.equals(localJar, new File(targetJar))) {
fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
if (!FileUtils.equals(localJar, targetJar)) {
fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath());
}
}
}
Expand Down
Loading
Loading