From 13ac2c6603fdeb2fe774cfd28d52e8c31256a676 Mon Sep 17 00:00:00 2001 From: zhangyuhang <2827528315@qq.com> Date: Thu, 19 Dec 2024 11:47:24 +0800 Subject: [PATCH] [Bug] Fix the issue where metrics do not update upon restart. (#4148) Co-authored-by: yuhang2.zhang --- .../flink/kubernetes/watcher/FlinkMetricsWatcher.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala index 3311d67f03..2802d6d005 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala @@ -89,14 +89,14 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default future.onComplete(_.getOrElse(None) match { case Some(metric) => val clusterKey = id.toClusterKey - // update current flink cluster metrics on cache - watchController.flinkMetrics.put(clusterKey, metric) val isMetricChanged = { val preMetric = watchController.flinkMetrics.get(clusterKey) preMetric == null || !preMetric.equalsPayload(metric) } if (isMetricChanged) { eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric)) + // update current flink cluster metrics on cache + watchController.flinkMetrics.put(clusterKey, metric) } case _ => })