Skip to content

Commit

Permalink
[Improve] custom_code build pipeline bug fixed. (#3484)
Browse files Browse the repository at this point in the history
Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Jan 10, 2024
1 parent 0734dce commit 97dfe6b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
Expand Down Expand Up @@ -378,7 +377,7 @@ private void prepareJars(Application app) throws IOException {

FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
if (app.isFlinkSqlJob() || app.isApacheFlinkCustomCodeJob()) {
if (app.isFlinkSqlJob() || app.isCustomCodeJob()) {
if (!app.getMavenDependency().getJar().isEmpty()) {
for (String jar : app.getMavenDependency().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
Expand All @@ -393,14 +392,14 @@ private void prepareJars(Application app) throws IOException {
}
}

if (app.isApacheFlinkCustomCodeJob()) {
if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());

File userJar;
if (resourceFrom == ResourceFrom.CICD) {
userJar = getAppDistJar(app);
userJar = getCustomCodeAppDistJar(app);
} else if (resourceFrom == ResourceFrom.UPLOAD) {
userJar = new File(WebUtils.getAppTempDir(), app.getJar());
} else {
Expand Down Expand Up @@ -484,15 +483,17 @@ private void prepareJars(Application app) throws IOException {
}
}

private File getAppDistJar(Application app) {
if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
return new File(app.getDistHome() + "/lib", app.getModule().concat(".jar"));
}
if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
return new File(app.getDistHome(), app.getJar());
private File getCustomCodeAppDistJar(Application app) {
switch (app.getApplicationType()) {
case APACHE_FLINK:
return new File(app.getDistHome(), app.getJar());
case STREAMPARK_FLINK:
String userJar = String.format("%s/lib/%s.jar", app.getDistHome(), app.getModule());
return new File(userJar);
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType());
}
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType());
}

/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli

@Autowired private EnvInitializer envInitializer;

@Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
@Autowired private FlinkK8sWatcher flinkK8sWatcher;

@Autowired private AppBuildPipeService appBuildPipeService;

Expand Down Expand Up @@ -282,7 +282,7 @@ public Map<String, Serializable> dashboard(Long teamId) {
}

// merge metrics from flink kubernetes cluster
FlinkMetricCV k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
FlinkMetricCV k8sMetric = flinkK8sWatcher.getAccGroupMetrics(teamId.toString());
if (k8sMetric != null) {
totalJmMemory += k8sMetric.totalJmMemory();
totalTmMemory += k8sMetric.totalTmMemory();
Expand Down Expand Up @@ -518,7 +518,7 @@ record -> {
// in time.
if (isKubernetesApp(record)) {
// set duration
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
String restUrl = flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
record.setFlinkRestUrl(restUrl);
if (record.getTracking() == 1
&& record.getStartTime() != null
Expand Down Expand Up @@ -707,7 +707,7 @@ public AppExistsState checkExists(Application appParam) {
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
&& flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(app))) {
return AppExistsState.IN_KUBERNETES;
}
}
Expand All @@ -723,7 +723,7 @@ else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
&& flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsState.IN_KUBERNETES;
}
}
Expand Down Expand Up @@ -1196,7 +1196,7 @@ public Application getApp(Application appParam) {
}
// add flink web url info for k8s-mode
if (isKubernetesApp(application)) {
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(application));
String restUrl = flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
application.setFlinkRestUrl(restUrl);

// set duration
Expand Down Expand Up @@ -1233,7 +1233,7 @@ public boolean mapping(Application appParam) {
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
flinkK8sWatcher.doWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.doWatching(application);
}
Expand Down Expand Up @@ -1354,7 +1354,7 @@ public void cancel(Application appParam) throws Exception {
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(trackId);
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
Expand All @@ -1380,7 +1380,7 @@ public void cancel(Application appParam) throws Exception {
}

if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(trackId);
flinkK8sWatcher.unWatching(trackId);
}
});
}
Expand Down Expand Up @@ -1620,7 +1620,7 @@ public void start(Application appParam, boolean auto) throws Exception {
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
if (isKubernetesApp(app)) {
k8SFlinkTrackMonitor.unWatching(trackId);
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
}
Expand Down Expand Up @@ -1656,7 +1656,7 @@ public void start(Application appParam, boolean auto) throws Exception {
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
application.setRelease(ReleaseState.DONE.get());
k8SFlinkTrackMonitor.doWatching(trackId);
flinkK8sWatcher.doWatching(trackId);
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
Expand Down Expand Up @@ -1755,7 +1755,7 @@ private void doStopped(Application appParam) {
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.doWatching(id);
flinkK8sWatcher.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
Expand Down
2 changes: 1 addition & 1 deletion streampark-console/streampark-console-webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"@vueuse/shared": "^9.6.0",
"@zxcvbn-ts/core": "^2.1.0",
"ant-design-vue": "^3.2.15",
"axios": "^1.2.1",
"axios": "^1.6.5",
"crypto-js": "^4.1.1",
"dayjs": "^1.11.6",
"lodash-es": "^4.17.21",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
/>
</a>
<p class="text-light-100 pt-10px" style="border-top: 1px solid #dad7d7">
Copyright © 2023 The Apache Software Foundation. Apache StreamPark, StreamPark, and its
Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache Software Foundation. Apache StreamPark, StreamPark, and its
feather logo are trademarks of The Apache Software Foundation.
</p>
</footer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.{BuildInEvent, FlinkJobStateEvent, FlinkJobStatusChangeEvent}
import org.apache.streampark.flink.kubernetes.model._
import org.apache.streampark.flink.kubernetes.watcher.{FlinkCheckpointWatcher, FlinkJobStatusWatcher, FlinkK8sEventWatcher, FlinkMetricWatcher, FlinkWatcher}
import org.apache.streampark.flink.kubernetes.watcher._

import com.google.common.eventbus.{AllowConcurrentEvents, Subscribe}

Expand All @@ -42,10 +42,10 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo
}

// remote server tracking watcher
val k8sEventWatcher = new FlinkK8sEventWatcher()
val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf)
private val k8sEventWatcher = new FlinkK8sEventWatcher()
private val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
private val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
private val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf)

private[this] val allWatchers =
Array[FlinkWatcher](k8sEventWatcher, jobStatusWatcher, metricsWatcher, checkpointWatcher)
Expand Down Expand Up @@ -118,7 +118,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo
watchController.endpoints.get(trackId.toClusterKey)

/** Build-in Event Listener of K8sFlinkTrackMonitor. */
class BuildInEventListener {
private class BuildInEventListener {

/**
* Watch the FlinkJobOperaEvent, then update relevant cache record and trigger a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import com.google.common.io.Files
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.util.Timeout
import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
Expand Down Expand Up @@ -116,9 +115,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
if (
latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId
) {

val eventChanged = latest == null ||
latest.jobState != jobState.jobState ||
latest.jobId != jobState.jobId

if (eventChanged) {
logInfo(s"eventChanged.....$trackId")
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
Expand Down Expand Up @@ -193,7 +196,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
pollAckTime = System.currentTimeMillis)
}
}

Some(jobState)
}

Expand All @@ -204,7 +206,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
* This method can be called directly from outside, without affecting the current cachePool
* result.
*/
protected[kubernetes] def touchSessionAllJob(
private def touchSessionAllJob(
@Nonnull clusterId: String,
@Nonnull namespace: String,
@Nonnull appId: Long,
Expand All @@ -219,8 +221,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
} else {
jobDetails.map {
d =>
TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d
.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) ->
d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
}
}
}
Expand All @@ -238,7 +240,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
val namespace = trackId.namespace
val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId))
if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
inferApplicationFlinkJobStateFromK8sEvent(trackId)
inferJobStateFromK8sEvent(trackId)
} else {
Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis))
}
Expand Down Expand Up @@ -284,7 +286,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
* Infer the current flink state from the last relevant k8s events. This method is only used for
* application-mode job inference in case of a failed JM rest request.
*/
private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
private def inferJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
pollEmitTime: Long): Option[JobStatusCV] = {

// infer from k8s deployment and event
Expand All @@ -301,14 +303,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
trackId.clusterId
)

val deployError = KubernetesDeploymentHelper.isDeploymentError(
trackId.namespace,
trackId.clusterId
)

val isConnection = KubernetesDeploymentHelper.checkConnection()

if (deployExists) {
val deployError = KubernetesDeploymentHelper.isDeploymentError(
trackId.namespace,
trackId.clusterId
)
if (!deployError) {
logger.info("Task Enter the initialization process.")
FlinkJobState.K8S_INITIALIZING
Expand Down

0 comments on commit 97dfe6b

Please sign in to comment.