diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala index 65985ca5a5..db2b3953cf 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala @@ -21,10 +21,11 @@ import org.apache.streampark.common.util.Logger import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps} import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait import org.apache.streampark.flink.client.bean._ -import org.apache.streampark.flink.client.impl.KubernetesSessionClientV2.{logError, logInfo} import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, JobManagerDef, TaskManagerDef} +import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ApplicationJobKey import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator +import org.apache.streampark.flink.kubernetes.v2.operator.OprError.{FlinkResourceNotFound, UnsupportedAction} import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse import org.apache.commons.lang3.StringUtils @@ -33,6 +34,7 @@ import org.apache.flink.configuration._ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions import org.apache.flink.runtime.jobgraph.SavepointConfigOptions import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion +import zio.ZIO import scala.collection.mutable import scala.jdk.CollectionConverters.mapAsScalaMapConverter @@ -40,7 +42,6 @@ import scala.util.{Failure, Success, Try} /** Flink K8s application mode task operation client via Flink K8s Operator */ object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with Logger { - private val observer = FlinkK8sObserver @throws[Throwable] override def doSubmit( @@ -248,19 +249,24 @@ object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with Logger def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = { val name = shutDownRequest.clusterId val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace - def richMsg: String => String = s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _ - FlinkK8sOperator.k8sCrOpr.deleteDeployment(namespace, name).runIOAsTry match { - case Success(_) => - observer.trackedKeys - .find(_.id == shutDownRequest.id) - .someOrUnitZIO(key => observer.untrack(key)) - logInfo(richMsg("Shutdown Flink Applicaition successfully.")) - ShutDownResponse() + FlinkK8sObserver.trackedKeys + .find { + case ApplicationJobKey(_, ns, n) => ns == namespace && n == name + case _ => false + } + .someOrUnitZIO(key => FlinkK8sOperator.delete(key.id)) + .catchSome { + case _: FlinkResourceNotFound => ZIO.unit + case _: UnsupportedAction => ZIO.unit + } + .as(ShutDownResponse()) + .runIOAsTry match { + case Success(result) => + logInfo(richMsg("Shutdown Flink Application deployment successfully.")); result case Failure(err) => - logError(richMsg(s"Fail to shutdown Flink Application"), err) - throw err + logError(richMsg(s"Fail to shutdown Flink Application deployment"), err); throw err } }