Skip to content

Commit

Permalink
[Bug] deploy flink job on yarn, get state bug fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
benjobs committed Nov 10, 2023
1 parent f23d515 commit 76a1b28
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpGet
import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.impl.client.HttpClients

import java.io.IOException
import java.net.InetAddress
import java.security.PrivilegedExceptionAction
import java.util
Expand Down Expand Up @@ -257,20 +258,19 @@ object YarnUtils extends Logger {
* url
* @return
*/
@throws[IOException]
def restRequest(url: String): String = {
if (url == null) return null

url match {
case u if u.matches("^http(|s)://.*") =>
Try(request(url)) match {
case Success(v) => v
case Failure(e) =>
if (hasYarnHttpKerberosAuth) {
logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
throw new IOException(s"yarnUtils authRestRequest error, url: $u, detail: $e")
} else {
logError(s"yarnUtils restRequest error, url: $u, detail: $e")
throw new IOException(s"yarnUtils restRequest error, url: $u, detail: $e")
}
null
}
case _ =>
Try(request(s"${getRMWebAppURL()}/$url")) match {
Expand All @@ -281,8 +281,7 @@ object YarnUtils extends Logger {
} match {
case Success(v) => v
case Failure(e) =>
logError(s"yarnUtils restRequest retry 5 times all failed. detail: $e")
null
throw new IOException(s"yarnUtils restRequest retry 5 times all failed. detail: $e")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
Expand All @@ -86,6 +87,9 @@
import javax.annotation.Nonnull;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -164,7 +168,7 @@ public boolean buildApplication(@Nonnull Application app, ApplicationLog applica
pipeline.registerWatcher(
new PipeWatcher() {
@Override
public void onStart(PipeSnapshot snapshot) {
public void onStart(PipeSnapshot snapshot) throws Exception {
AppBuildPipeline buildPipeline =
AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
saveEntity(buildPipeline);
Expand Down Expand Up @@ -364,7 +368,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
}
}

private void prepareJars(Application app) {
private void prepareJars(Application app) throws IOException {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
if (!localUploadDIR.exists()) {
localUploadDIR.mkdirs();
Expand Down Expand Up @@ -421,7 +425,7 @@ private void prepareJars(Application app) {
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
List<File> jars = new ArrayList<>(0);

// 1) user jar
// 1). user jar
jars.add(libJar);

// 2). jar dependency
Expand All @@ -434,9 +438,24 @@ private void prepareJars(Application app) {
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
}

// 4). local uploadDIR to hdfs uploadsDIR
String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
for (File jarFile : jars) {
String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
if (!fsOperator.exists(hdfsUploadPath)) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
} else {
InputStream inputStream = Files.newInputStream(jarFile.toPath());
if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
}
}
}

// 5). copy jars to $hdfs_app_home/lib
fsOperator.mkCleanDirs(app.getAppLib());
// 4). upload jars to appLibDIR
jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib()));
jars.forEach(
jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib()));
}
} else {
String appHome = app.getAppHome();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ private void watch(Long key, Application application) {
STOP_FROM_MAP.getOrDefault(key, null) == null
? StopFrom.NONE
: STOP_FROM_MAP.get(key);
final OptionState optionState = OPTIONING.get(key);
try {
// query status from flink rest api
getFromFlinkRestApi(application, stopFrom);
Expand All @@ -220,37 +219,36 @@ private void watch(Long key, Application application) {
Query from flink's restAPI and yarn's restAPI both failed.
In this case, it is necessary to decide whether to return to the final state depending on the state being operated
*/
if (optionState == null || !optionState.equals(OptionState.STARTING)) {
// non-mapping
if (application.getState() != FlinkAppState.MAPPING.getValue()) {
log.error(
"FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
alertService.alert(application, FlinkAppState.LOST);
} else {
application.setState(FlinkAppState.CANCELED.getValue());
}
// non-mapping
if (application.getState() != FlinkAppState.MAPPING.getValue()) {
log.error(
"FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
alertService.alert(application, FlinkAppState.LOST);
} else {
application.setState(FlinkAppState.CANCELED.getValue());
}
/*
This step means that the above two ways to get information have failed, and this step is the last step,
which will directly identify the mission as cancelled or lost.
Need clean savepoint.
*/
application.setEndTime(new Date());
cleanSavepoint(application);
cleanOptioning(optionState, key);
doPersistMetrics(application, true);
FlinkAppState appState = FlinkAppState.of(application.getState());
if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) {
alertService.alert(application, FlinkAppState.of(application.getState()));
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationService.start(application, true);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/*
This step means that the above two ways to get information have failed, and this step is the last step,
which will directly identify the mission as cancelled or lost.
Need clean savepoint.
*/
application.setEndTime(new Date());
cleanSavepoint(application);
OptionState optionState = OPTIONING.get(key);
cleanOptioning(optionState, key);
doPersistMetrics(application, true);
FlinkAppState appState = FlinkAppState.of(application.getState());
if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) {
alertService.alert(application, FlinkAppState.of(application.getState()));
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationService.start(application, true);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
Expand Down Expand Up @@ -738,9 +736,6 @@ private CheckPoints httpCheckpoints(Application application) throws Exception {

private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException {
String result = YarnUtils.restRequest(url);
if (null == result) {
return null;
}
return JacksonUtils.read(result, clazz);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,28 +207,22 @@ trait FlinkClientTrait extends Logger {
jobGraphFunc(submitRequest, flinkConfig, jarFile)
} match {
case Failure(e) =>
logWarn(
s"""\n
|[flink-submit] JobGraph Submit Plan failed, error detail:
|------------------------------------------------------------------
|${Utils.stringifyException(e)}
|------------------------------------------------------------------
|Now retry submit with RestAPI Plan ...
|""".stripMargin
)
Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match {
case Success(r) => r
case Failure(e) =>
logError(
case Failure(e1) =>
throw new RuntimeException(
s"""\n
|[flink-submit] RestAPI Submit failed, error detail:
|[flink-submit] Both JobGraph submit plan and Rest API submit plan all failed!
|JobGraph submit plan failed detail:
|------------------------------------------------------------------
|${Utils.stringifyException(e)}
|------------------------------------------------------------------
|Both JobGraph submit plan and Rest API submit plan all failed!
|""".stripMargin
)
throw e
|
| RestAPI Submit failed, error detail:
| ------------------------------------------------------------------
|${Utils.stringifyException(e1)}
|------------------------------------------------------------------
|""".stripMargin)
}
case Success(v) => v
}
Expand Down

0 comments on commit 76a1b28

Please sign in to comment.