diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index a002a85850..6834d940df 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -19,7 +19,6 @@ import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.Workspace; -import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.FileUtils; @@ -378,7 +377,7 @@ private void prepareJars(Application app) throws IOException { FsOperator localFS = FsOperator.lfs(); // 1. copy jar to local upload dir - if (app.isFlinkSqlJob() || app.isApacheFlinkCustomCodeJob()) { + if (app.isFlinkSqlJob() || app.isCustomCodeJob()) { if (!app.getMavenDependency().getJar().isEmpty()) { for (String jar : app.getMavenDependency().getJar()) { File localJar = new File(WebUtils.getAppTempDir(), jar); @@ -393,14 +392,14 @@ private void prepareJars(Application app) throws IOException { } } - if (app.isApacheFlinkCustomCodeJob()) { + if (app.isCustomCodeJob()) { // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom()); File userJar; if (resourceFrom == ResourceFrom.CICD) { - userJar = getAppDistJar(app); + userJar = getCustomCodeAppDistJar(app); } else if (resourceFrom == ResourceFrom.UPLOAD) { userJar = new File(WebUtils.getAppTempDir(), app.getJar()); } else { @@ -484,15 +483,17 @@ private void prepareJars(Application app) throws IOException { } } - private File getAppDistJar(Application app) { - if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) { - return new File(app.getDistHome() + "/lib", app.getModule().concat(".jar")); - } - if (app.getApplicationType() == ApplicationType.APACHE_FLINK) { - return new File(app.getDistHome(), app.getJar()); + private File getCustomCodeAppDistJar(Application app) { + switch (app.getApplicationType()) { + case APACHE_FLINK: + return new File(app.getDistHome(), app.getJar()); + case STREAMPARK_FLINK: + String userJar = String.format("%s/lib/%s.jar", app.getDistHome(), app.getModule()); + return new File(userJar); + default: + throw new IllegalArgumentException( + "[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType()); } - throw new IllegalArgumentException( - "[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType()); } /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */ diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index ed55c27cc0..869e79afdf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -210,7 +210,7 @@ public class ApplicationServiceImpl extends ServiceImpl dashboard(Long teamId) { } // merge metrics from flink kubernetes cluster - FlinkMetricCV k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString()); + FlinkMetricCV k8sMetric = flinkK8sWatcher.getAccGroupMetrics(teamId.toString()); if (k8sMetric != null) { totalJmMemory += k8sMetric.totalJmMemory(); totalTmMemory += k8sMetric.totalTmMemory(); @@ -518,7 +518,7 @@ record -> { // in time. if (isKubernetesApp(record)) { // set duration - String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record)); + String restUrl = flinkK8sWatcher.getRemoteRestUrl(toTrackId(record)); record.setFlinkRestUrl(restUrl); if (record.getTracking() == 1 && record.getStartTime() != null @@ -707,7 +707,7 @@ public AppExistsState checkExists(Application appParam) { } // check whether clusterId, namespace, jobId on kubernetes else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode()) - && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) { + && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(app))) { return AppExistsState.IN_KUBERNETES; } } @@ -723,7 +723,7 @@ else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode()) } // check whether clusterId, namespace, jobId on kubernetes else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode()) - && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) { + && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(appParam))) { return AppExistsState.IN_KUBERNETES; } } @@ -1196,7 +1196,7 @@ public Application getApp(Application appParam) { } // add flink web url info for k8s-mode if (isKubernetesApp(application)) { - String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(application)); + String restUrl = flinkK8sWatcher.getRemoteRestUrl(toTrackId(application)); application.setFlinkRestUrl(restUrl); // set duration @@ -1233,7 +1233,7 @@ public boolean mapping(Application appParam) { boolean mapping = this.baseMapper.mapping(appParam); Application application = getById(appParam.getId()); if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.doWatching(toTrackId(application)); + flinkK8sWatcher.doWatching(toTrackId(application)); } else { FlinkRESTAPIWatcher.doWatching(application); } @@ -1354,7 +1354,7 @@ public void cancel(Application appParam) throws Exception { } // re-tracking flink job on kubernetes and logging exception if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.unWatching(trackId); + flinkK8sWatcher.unWatching(trackId); } else { FlinkRESTAPIWatcher.unWatching(application.getId()); } @@ -1380,7 +1380,7 @@ public void cancel(Application appParam) throws Exception { } if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.unWatching(trackId); + flinkK8sWatcher.unWatching(trackId); } }); } @@ -1620,7 +1620,7 @@ public void start(Application appParam, boolean auto) throws Exception { app.setOptionState(OptionState.NONE.getValue()); updateById(app); if (isKubernetesApp(app)) { - k8SFlinkTrackMonitor.unWatching(trackId); + flinkK8sWatcher.unWatching(trackId); } else { FlinkRESTAPIWatcher.unWatching(appParam.getId()); } @@ -1656,7 +1656,7 @@ public void start(Application appParam, boolean auto) throws Exception { // if start completed, will be added task to tracking queue if (isKubernetesApp(application)) { application.setRelease(ReleaseState.DONE.get()); - k8SFlinkTrackMonitor.doWatching(trackId); + flinkK8sWatcher.doWatching(trackId); if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) { String domainName = settingService.getIngressModeDefault(); if (StringUtils.isNotBlank(domainName)) { @@ -1755,7 +1755,7 @@ private void doStopped(Application appParam) { // re-tracking flink job on kubernetes and logging exception if (isKubernetesApp(application)) { TrackId id = toTrackId(application); - k8SFlinkTrackMonitor.doWatching(id); + flinkK8sWatcher.doWatching(id); } else { FlinkRESTAPIWatcher.unWatching(application.getId()); } diff --git a/streampark-console/streampark-console-webapp/package.json b/streampark-console/streampark-console-webapp/package.json index 9d2d435544..8622c067f7 100644 --- a/streampark-console/streampark-console-webapp/package.json +++ b/streampark-console/streampark-console-webapp/package.json @@ -49,7 +49,7 @@ "@vueuse/shared": "^9.6.0", "@zxcvbn-ts/core": "^2.1.0", "ant-design-vue": "^3.2.15", - "axios": "^1.2.1", + "axios": "^1.6.5", "crypto-js": "^4.1.1", "dayjs": "^1.11.6", "lodash-es": "^4.17.21", diff --git a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue index 8daa7445d2..2e0598e683 100644 --- a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue +++ b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue @@ -59,7 +59,7 @@ />

- Copyright © 2023 The Apache Software Foundation. Apache StreamPark, StreamPark, and its + Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache Software Foundation. Apache StreamPark, StreamPark, and its feather logo are trademarks of The Apache Software Foundation.

diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala index e9a3b06e1e..4e3d808c44 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala @@ -21,7 +21,7 @@ import org.apache.streampark.flink.kubernetes.enums.FlinkJobState import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION} import org.apache.streampark.flink.kubernetes.event.{BuildInEvent, FlinkJobStateEvent, FlinkJobStatusChangeEvent} import org.apache.streampark.flink.kubernetes.model._ -import org.apache.streampark.flink.kubernetes.watcher.{FlinkCheckpointWatcher, FlinkJobStatusWatcher, FlinkK8sEventWatcher, FlinkMetricWatcher, FlinkWatcher} +import org.apache.streampark.flink.kubernetes.watcher._ import com.google.common.eventbus.{AllowConcurrentEvents, Subscribe} @@ -42,10 +42,10 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo } // remote server tracking watcher - val k8sEventWatcher = new FlinkK8sEventWatcher() - val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf) - val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf) - val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf) + private val k8sEventWatcher = new FlinkK8sEventWatcher() + private val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf) + private val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf) + private val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf) private[this] val allWatchers = Array[FlinkWatcher](k8sEventWatcher, jobStatusWatcher, metricsWatcher, checkpointWatcher) @@ -118,7 +118,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo watchController.endpoints.get(trackId.toClusterKey) /** Build-in Event Listener of K8sFlinkTrackMonitor. */ - class BuildInEventListener { + private class BuildInEventListener { /** * Watch the FlinkJobOperaEvent, then update relevant cache record and trigger a new diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index 6d731fe831..eec41ebd62 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -31,7 +31,6 @@ import com.google.common.io.Files import org.apache.flink.core.fs.Path import org.apache.flink.runtime.history.FsJobArchivist import org.apache.hc.client5.http.fluent.Request -import org.apache.hc.core5.util.Timeout import org.json4s.{DefaultFormats, JNothing, JNull} import org.json4s.JsonAST.JArray import org.json4s.jackson.JsonMethods.parse @@ -116,9 +115,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi case Some(jobState) => val trackId = id.copy(jobId = jobState.jobId) val latest: JobStatusCV = watchController.jobStatuses.get(trackId) - if ( - latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId - ) { + + val eventChanged = latest == null || + latest.jobState != jobState.jobState || + latest.jobId != jobState.jobId + + if (eventChanged) { + logInfo(s"eventChanged.....$trackId") // put job status to cache watchController.jobStatuses.put(trackId, jobState) // set jobId to trackIds @@ -193,7 +196,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi pollAckTime = System.currentTimeMillis) } } - Some(jobState) } @@ -204,7 +206,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi * This method can be called directly from outside, without affecting the current cachePool * result. */ - protected[kubernetes] def touchSessionAllJob( + private def touchSessionAllJob( @Nonnull clusterId: String, @Nonnull namespace: String, @Nonnull appId: Long, @@ -219,8 +221,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi } else { jobDetails.map { d => - TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d - .toJobStatusCV(pollEmitTime, System.currentTimeMillis) + TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> + d.toJobStatusCV(pollEmitTime, System.currentTimeMillis) } } } @@ -238,7 +240,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi val namespace = trackId.namespace val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId)) if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) { - inferApplicationFlinkJobStateFromK8sEvent(trackId) + inferJobStateFromK8sEvent(trackId) } else { Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis)) } @@ -284,7 +286,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi * Infer the current flink state from the last relevant k8s events. This method is only used for * application-mode job inference in case of a failed JM rest request. */ - private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit + private def inferJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit pollEmitTime: Long): Option[JobStatusCV] = { // infer from k8s deployment and event @@ -301,14 +303,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi trackId.clusterId ) - val deployError = KubernetesDeploymentHelper.isDeploymentError( - trackId.namespace, - trackId.clusterId - ) - val isConnection = KubernetesDeploymentHelper.checkConnection() if (deployExists) { + val deployError = KubernetesDeploymentHelper.isDeploymentError( + trackId.namespace, + trackId.clusterId + ) if (!deployError) { logger.info("Task Enter the initialization process.") FlinkJobState.K8S_INITIALIZING