Skip to content

Commit

Permalink
[Improve] Improve streampark-flink-client-core module base on Naming …
Browse files Browse the repository at this point in the history
…Style and add log
  • Loading branch information
caicancai committed Oct 10, 2023
1 parent 1f35c80 commit 152dd8e
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,59 +42,65 @@ object FlinkClientEndpoint {
}
)

def submit(request: SubmitRequest): SubmitResponse = {
clients.get(request.executionMode) match {
case Some(client) => client.submit(request)
def submit(submitRequest: SubmitRequest): SubmitResponse = {
clients.get(submitRequest.executionMode) match {
case Some(client) => client.submit(submitRequest)
case _ =>
throw new UnsupportedOperationException(s"Unsupported ${request.executionMode} submit ")
throw new UnsupportedOperationException(
s"Unsupported ${submitRequest.executionMode} submit ")
}
}

def cancel(request: CancelRequest): CancelResponse = {
clients.get(request.executionMode) match {
case Some(client) => client.cancel(request)
def cancel(cancelRequest: CancelRequest): CancelResponse = {
clients.get(cancelRequest.executionMode) match {
case Some(client) => client.cancel(cancelRequest)
case _ =>
throw new UnsupportedOperationException(s"Unsupported ${request.executionMode} cancel ")
throw new UnsupportedOperationException(
s"Unsupported ${cancelRequest.executionMode} cancel ")
}
}

def triggerSavepoint(request: TriggerSavepointRequest): SavepointResponse = {
clients.get(request.executionMode) match {
case Some(client) => client.triggerSavepoint(request)
def triggerSavepoint(savepointRequest: TriggerSavepointRequest): SavepointResponse = {
clients.get(savepointRequest.executionMode) match {
case Some(client) => client.triggerSavepoint(savepointRequest)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} triggerSavepoint ")
s"Unsupported ${savepointRequest.executionMode} triggerSavepoint ")
}
}

def deploy(request: DeployRequest): DeployResponse = {
request.executionMode match {
case YARN_SESSION => YarnSessionClient.deploy(request)
case KUBERNETES_NATIVE_SESSION => KubernetesNativeSessionClient.deploy(request)
def deploy(deployRequest: DeployRequest): DeployResponse = {
deployRequest.executionMode match {
case YARN_SESSION => YarnSessionClient.deploy(deployRequest)
case KUBERNETES_NATIVE_SESSION =>
K8sFlinkConfig.isV2Enabled match {
case true => KubernetesSessionClientV2.deploy(deployRequest)
case _ => KubernetesNativeSessionClient.deploy(deployRequest)
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} deploy cluster ")
s"Unsupported ${deployRequest.executionMode} deploy cluster ")
}
}

def shutdown(request: ShutDownRequest): ShutDownResponse = {
request.executionMode match {
case YARN_SESSION => YarnSessionClient.shutdown(request)
def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
shutDownRequest.executionMode match {
case YARN_SESSION => YarnSessionClient.shutdown(shutDownRequest)
case KUBERNETES_NATIVE_SESSION =>
K8sFlinkConfig.isV2Enabled match {
case true => KubernetesSessionClientV2.shutdown(request)
case _ => KubernetesNativeSessionClient.shutdown(request)
case true => KubernetesSessionClientV2.shutdown(shutDownRequest)
case _ => KubernetesNativeSessionClient.shutdown(shutDownRequest)
}
case KUBERNETES_NATIVE_APPLICATION =>
K8sFlinkConfig.isV2Enabled match {
case true => KubernetesApplicationClientV2.shutdown(request)
case true => KubernetesApplicationClientV2.shutdown(shutDownRequest)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} shutdown application ")
s"Unsupported ${shutDownRequest.executionMode} shutdown application ")
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} shutdown cluster ")
s"Unsupported ${shutDownRequest.executionMode} shutdown cluster ")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
import zio.ZIO

import scala.collection.convert.ImplicitConversions.`map AsScala`
import scala.collection.mutable
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -125,7 +126,20 @@ object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {

@throws[Throwable]
def deploy(deployRequest: DeployRequest): DeployResponse = {

logInfo(
s"""
|--------------------------------------- kubernetes session start ---------------------------------------
| userFlinkHome : ${deployRequest.flinkVersion.flinkHome}
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
| namespace : ${deployRequest.k8sDeployParam.kubernetesNamespace}
| exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType}
| serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
| flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
val richMsg: String => String = s"[flink-submit][appId=${deployRequest.id}] " + _

val flinkConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ trait FlinkClientTrait extends Logger {

@throws[Exception]
def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse

@throws[Exception]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {

@throws[Exception]
override def doTriggerSavepoint(
triggerSavepointRequest: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {

val savepointDef = JobSavepointDef(
savepointPath = Option(triggerSavepointRequest.savepointPath),
formatType = Option(triggerSavepointRequest.nativeFormat)
savepointPath = Option(savepointRequest.savepointPath),
formatType = Option(savepointRequest.nativeFormat)
.map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT)
)

def richMsg: String => String =
s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _
s"[flink-trigger-savepoint][appId=${savepointRequest.id}] " + _

FlinkK8sOperator
.triggerJobSavepoint(triggerSavepointRequest.id, savepointDef)
.triggerJobSavepoint(savepointRequest.id, savepointDef)
.flatMap {
result =>
if (result.isFailed) ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
Expand All @@ -165,7 +165,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
case Failure(err) =>
logError(
richMsg(
s"Trigger flink job savepoint failed in ${triggerSavepointRequest.executionMode.getName}_V2 mode!"),
s"Trigger flink job savepoint failed in ${savepointRequest.executionMode.getName}_V2 mode!"),
err)
throw err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,16 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {

@throws[Exception]
override def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
executeClientAction(
request,
savepointRequest,
flinkConfig,
(jobId, clusterClient) => {
val actionResult = super.triggerSavepoint(request, jobId, clusterClient)
val actionResult = super.triggerSavepoint(savepointRequest, jobId, clusterClient)
SavepointResponse(actionResult)
})
}
)
}

// noinspection DuplicatedCode
Expand Down

0 comments on commit 152dd8e

Please sign in to comment.