Skip to content

Commit

Permalink
Imoprove KubernetesApplicationClientV2.scala at PR#3186
Browse files Browse the repository at this point in the history
  • Loading branch information
Al-assad authored Oct 8, 2023
1 parent 8d9cafb commit a993111
Showing 1 changed file with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import org.apache.streampark.common.util.Logger
import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.client.impl.KubernetesSessionClientV2.{logError, logInfo}
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, JobManagerDef, TaskManagerDef}
import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ApplicationJobKey
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
import org.apache.streampark.flink.kubernetes.v2.operator.OprError.{FlinkResourceNotFound, UnsupportedAction}
import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse

import org.apache.commons.lang3.StringUtils
Expand All @@ -33,14 +34,14 @@ import org.apache.flink.configuration._
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
import zio.ZIO

import scala.collection.mutable
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
import scala.util.{Failure, Success, Try}

/** Flink K8s application mode task operation client via Flink K8s Operator */
object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with Logger {
private val observer = FlinkK8sObserver

@throws[Throwable]
override def doSubmit(
Expand Down Expand Up @@ -248,19 +249,24 @@ object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with Logger
def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
val name = shutDownRequest.clusterId
val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace

def richMsg: String => String = s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _

FlinkK8sOperator.k8sCrOpr.deleteDeployment(namespace, name).runIOAsTry match {
case Success(_) =>
observer.trackedKeys
.find(_.id == shutDownRequest.id)
.someOrUnitZIO(key => observer.untrack(key))
logInfo(richMsg("Shutdown Flink Applicaition successfully."))
ShutDownResponse()
FlinkK8sObserver.trackedKeys
.find {
case ApplicationJobKey(_, ns, n) => ns == namespace && n == name
case _ => false
}
.someOrUnitZIO(key => FlinkK8sOperator.delete(key.id))
.catchSome {
case _: FlinkResourceNotFound => ZIO.unit
case _: UnsupportedAction => ZIO.unit
}
.as(ShutDownResponse())
.runIOAsTry match {
case Success(result) =>
logInfo(richMsg("Shutdown Flink Application deployment successfully.")); result
case Failure(err) =>
logError(richMsg(s"Fail to shutdown Flink Application"), err)
throw err
logError(richMsg(s"Fail to shutdown Flink Application deployment"), err); throw err
}
}

Expand Down

0 comments on commit a993111

Please sign in to comment.