Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] custom_code build pipeline bug fixed. #3484

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
Expand Down Expand Up @@ -378,7 +377,7 @@ private void prepareJars(Application app) throws IOException {

FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
if (app.isFlinkSqlJob() || app.isApacheFlinkCustomCodeJob()) {
if (app.isFlinkSqlJob() || app.isCustomCodeJob()) {
if (!app.getMavenDependency().getJar().isEmpty()) {
for (String jar : app.getMavenDependency().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
Expand All @@ -393,14 +392,14 @@ private void prepareJars(Application app) throws IOException {
}
}

if (app.isApacheFlinkCustomCodeJob()) {
if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());

File userJar;
if (resourceFrom == ResourceFrom.CICD) {
userJar = getAppDistJar(app);
userJar = getCustomCodeAppDistJar(app);
} else if (resourceFrom == ResourceFrom.UPLOAD) {
userJar = new File(WebUtils.getAppTempDir(), app.getJar());
} else {
Expand Down Expand Up @@ -484,15 +483,17 @@ private void prepareJars(Application app) throws IOException {
}
}

private File getAppDistJar(Application app) {
if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
return new File(app.getDistHome() + "/lib", app.getModule().concat(".jar"));
}
if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
return new File(app.getDistHome(), app.getJar());
private File getCustomCodeAppDistJar(Application app) {
switch (app.getApplicationType()) {
case APACHE_FLINK:
return new File(app.getDistHome(), app.getJar());
case STREAMPARK_FLINK:
String userJar = String.format("%s/lib/%s.jar", app.getDistHome(), app.getModule());
return new File(userJar);
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType());
}
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType());
}

/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli

@Autowired private EnvInitializer envInitializer;

@Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
@Autowired private FlinkK8sWatcher flinkK8sWatcher;

@Autowired private AppBuildPipeService appBuildPipeService;

Expand Down Expand Up @@ -282,7 +282,7 @@ public Map<String, Serializable> dashboard(Long teamId) {
}

// merge metrics from flink kubernetes cluster
FlinkMetricCV k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
FlinkMetricCV k8sMetric = flinkK8sWatcher.getAccGroupMetrics(teamId.toString());
if (k8sMetric != null) {
totalJmMemory += k8sMetric.totalJmMemory();
totalTmMemory += k8sMetric.totalTmMemory();
Expand Down Expand Up @@ -518,7 +518,7 @@ record -> {
// in time.
if (isKubernetesApp(record)) {
// set duration
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
String restUrl = flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
record.setFlinkRestUrl(restUrl);
if (record.getTracking() == 1
&& record.getStartTime() != null
Expand Down Expand Up @@ -707,7 +707,7 @@ public AppExistsState checkExists(Application appParam) {
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
&& flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(app))) {
return AppExistsState.IN_KUBERNETES;
}
}
Expand All @@ -723,7 +723,7 @@ else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
&& flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsState.IN_KUBERNETES;
}
}
Expand Down Expand Up @@ -1196,7 +1196,7 @@ public Application getApp(Application appParam) {
}
// add flink web url info for k8s-mode
if (isKubernetesApp(application)) {
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(application));
String restUrl = flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
application.setFlinkRestUrl(restUrl);

// set duration
Expand Down Expand Up @@ -1233,7 +1233,7 @@ public boolean mapping(Application appParam) {
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
flinkK8sWatcher.doWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.doWatching(application);
}
Expand Down Expand Up @@ -1354,7 +1354,7 @@ public void cancel(Application appParam) throws Exception {
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(trackId);
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
Expand All @@ -1380,7 +1380,7 @@ public void cancel(Application appParam) throws Exception {
}

if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(trackId);
flinkK8sWatcher.unWatching(trackId);
}
});
}
Expand Down Expand Up @@ -1620,7 +1620,7 @@ public void start(Application appParam, boolean auto) throws Exception {
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
if (isKubernetesApp(app)) {
k8SFlinkTrackMonitor.unWatching(trackId);
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
}
Expand Down Expand Up @@ -1656,7 +1656,7 @@ public void start(Application appParam, boolean auto) throws Exception {
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
application.setRelease(ReleaseState.DONE.get());
k8SFlinkTrackMonitor.doWatching(trackId);
flinkK8sWatcher.doWatching(trackId);
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
Expand Down Expand Up @@ -1755,7 +1755,7 @@ private void doStopped(Application appParam) {
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.doWatching(id);
flinkK8sWatcher.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
Expand Down
2 changes: 1 addition & 1 deletion streampark-console/streampark-console-webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"@vueuse/shared": "^9.6.0",
"@zxcvbn-ts/core": "^2.1.0",
"ant-design-vue": "^3.2.15",
"axios": "^1.2.1",
"axios": "^1.6.5",
"crypto-js": "^4.1.1",
"dayjs": "^1.11.6",
"lodash-es": "^4.17.21",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
/>
</a>
<p class="text-light-100 pt-10px" style="border-top: 1px solid #dad7d7">
Copyright © 2023 The Apache Software Foundation. Apache StreamPark, StreamPark, and its
Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache Software Foundation. Apache StreamPark, StreamPark, and its
feather logo are trademarks of The Apache Software Foundation.
</p>
</footer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.{BuildInEvent, FlinkJobStateEvent, FlinkJobStatusChangeEvent}
import org.apache.streampark.flink.kubernetes.model._
import org.apache.streampark.flink.kubernetes.watcher.{FlinkCheckpointWatcher, FlinkJobStatusWatcher, FlinkK8sEventWatcher, FlinkMetricWatcher, FlinkWatcher}
import org.apache.streampark.flink.kubernetes.watcher._

import com.google.common.eventbus.{AllowConcurrentEvents, Subscribe}

Expand All @@ -42,10 +42,10 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo
}

// remote server tracking watcher
val k8sEventWatcher = new FlinkK8sEventWatcher()
val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf)
private val k8sEventWatcher = new FlinkK8sEventWatcher()
private val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
private val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
private val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf)

private[this] val allWatchers =
Array[FlinkWatcher](k8sEventWatcher, jobStatusWatcher, metricsWatcher, checkpointWatcher)
Expand Down Expand Up @@ -118,7 +118,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo
watchController.endpoints.get(trackId.toClusterKey)

/** Build-in Event Listener of K8sFlinkTrackMonitor. */
class BuildInEventListener {
private class BuildInEventListener {

/**
* Watch the FlinkJobOperaEvent, then update relevant cache record and trigger a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import com.google.common.io.Files
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.util.Timeout
import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
Expand Down Expand Up @@ -116,9 +115,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
if (
latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId
) {

val eventChanged = latest == null ||
latest.jobState != jobState.jobState ||
latest.jobId != jobState.jobId

if (eventChanged) {
logInfo(s"eventChanged.....$trackId")
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
Expand Down Expand Up @@ -193,7 +196,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
pollAckTime = System.currentTimeMillis)
}
}

Some(jobState)
}

Expand All @@ -204,7 +206,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
* This method can be called directly from outside, without affecting the current cachePool
* result.
*/
protected[kubernetes] def touchSessionAllJob(
private def touchSessionAllJob(
@Nonnull clusterId: String,
@Nonnull namespace: String,
@Nonnull appId: Long,
Expand All @@ -219,8 +221,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
} else {
jobDetails.map {
d =>
TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d
.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) ->
d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
}
}
}
Expand All @@ -238,7 +240,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
val namespace = trackId.namespace
val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId))
if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
inferApplicationFlinkJobStateFromK8sEvent(trackId)
inferJobStateFromK8sEvent(trackId)
} else {
Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis))
}
Expand Down Expand Up @@ -284,7 +286,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
* Infer the current flink state from the last relevant k8s events. This method is only used for
* application-mode job inference in case of a failed JM rest request.
*/
private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
private def inferJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
pollEmitTime: Long): Option[JobStatusCV] = {

// infer from k8s deployment and event
Expand All @@ -301,14 +303,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
trackId.clusterId
)

val deployError = KubernetesDeploymentHelper.isDeploymentError(
trackId.namespace,
trackId.clusterId
)

val isConnection = KubernetesDeploymentHelper.checkConnection()

if (deployExists) {
val deployError = KubernetesDeploymentHelper.isDeploymentError(
trackId.namespace,
trackId.clusterId
)
if (!deployError) {
logger.info("Task Enter the initialization process.")
FlinkJobState.K8S_INITIALIZING
Expand Down
Loading