Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Improve streampark-flink-client-core module base on Naming Style and add deploy log #3231

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,59 +42,65 @@ object FlinkClientEndpoint {
}
)

def submit(request: SubmitRequest): SubmitResponse = {
clients.get(request.executionMode) match {
case Some(client) => client.submit(request)
def submit(submitRequest: SubmitRequest): SubmitResponse = {
clients.get(submitRequest.executionMode) match {
case Some(client) => client.submit(submitRequest)
case _ =>
throw new UnsupportedOperationException(s"Unsupported ${request.executionMode} submit ")
throw new UnsupportedOperationException(
s"Unsupported ${submitRequest.executionMode} submit ")
}
}

def cancel(request: CancelRequest): CancelResponse = {
clients.get(request.executionMode) match {
case Some(client) => client.cancel(request)
def cancel(cancelRequest: CancelRequest): CancelResponse = {
clients.get(cancelRequest.executionMode) match {
case Some(client) => client.cancel(cancelRequest)
case _ =>
throw new UnsupportedOperationException(s"Unsupported ${request.executionMode} cancel ")
throw new UnsupportedOperationException(
s"Unsupported ${cancelRequest.executionMode} cancel ")
}
}

def triggerSavepoint(request: TriggerSavepointRequest): SavepointResponse = {
clients.get(request.executionMode) match {
case Some(client) => client.triggerSavepoint(request)
def triggerSavepoint(savepointRequest: TriggerSavepointRequest): SavepointResponse = {
clients.get(savepointRequest.executionMode) match {
case Some(client) => client.triggerSavepoint(savepointRequest)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} triggerSavepoint ")
s"Unsupported ${savepointRequest.executionMode} triggerSavepoint ")
}
}

def deploy(request: DeployRequest): DeployResponse = {
request.executionMode match {
case YARN_SESSION => YarnSessionClient.deploy(request)
case KUBERNETES_NATIVE_SESSION => KubernetesNativeSessionClient.deploy(request)
def deploy(deployRequest: DeployRequest): DeployResponse = {
deployRequest.executionMode match {
case YARN_SESSION => YarnSessionClient.deploy(deployRequest)
case KUBERNETES_NATIVE_SESSION =>
K8sFlinkConfig.isV2Enabled match {
case true => KubernetesSessionClientV2.deploy(deployRequest)
case _ => KubernetesNativeSessionClient.deploy(deployRequest)
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} deploy cluster ")
s"Unsupported ${deployRequest.executionMode} deploy cluster ")
}
}

def shutdown(request: ShutDownRequest): ShutDownResponse = {
request.executionMode match {
case YARN_SESSION => YarnSessionClient.shutdown(request)
def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
shutDownRequest.executionMode match {
case YARN_SESSION => YarnSessionClient.shutdown(shutDownRequest)
case KUBERNETES_NATIVE_SESSION =>
K8sFlinkConfig.isV2Enabled match {
case true => KubernetesSessionClientV2.shutdown(request)
case _ => KubernetesNativeSessionClient.shutdown(request)
case true => KubernetesSessionClientV2.shutdown(shutDownRequest)
case _ => KubernetesNativeSessionClient.shutdown(shutDownRequest)
}
case KUBERNETES_NATIVE_APPLICATION =>
K8sFlinkConfig.isV2Enabled match {
case true => KubernetesApplicationClientV2.shutdown(request)
case true => KubernetesApplicationClientV2.shutdown(shutDownRequest)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} shutdown application ")
s"Unsupported ${shutDownRequest.executionMode} shutdown application ")
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} shutdown cluster ")
s"Unsupported ${shutDownRequest.executionMode} shutdown cluster ")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
import zio.ZIO

import scala.collection.convert.ImplicitConversions.`map AsScala`
import scala.collection.mutable
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -125,7 +126,20 @@ object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {

@throws[Throwable]
def deploy(deployRequest: DeployRequest): DeployResponse = {

logInfo(
s"""
|--------------------------------------- kubernetes session start ---------------------------------------
| userFlinkHome : ${deployRequest.flinkVersion.flinkHome}
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
| namespace : ${deployRequest.k8sDeployParam.kubernetesNamespace}
| exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType}
| serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
| flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
val richMsg: String => String = s"[flink-submit][appId=${deployRequest.id}] " + _

val flinkConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ trait FlinkClientTrait extends Logger {

@throws[Exception]
def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse

@throws[Exception]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {

@throws[Exception]
override def doTriggerSavepoint(
triggerSavepointRequest: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {

val savepointDef = JobSavepointDef(
savepointPath = Option(triggerSavepointRequest.savepointPath),
formatType = Option(triggerSavepointRequest.nativeFormat)
savepointPath = Option(savepointRequest.savepointPath),
formatType = Option(savepointRequest.nativeFormat)
.map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT)
)

def richMsg: String => String =
s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _
s"[flink-trigger-savepoint][appId=${savepointRequest.id}] " + _

FlinkK8sOperator
.triggerJobSavepoint(triggerSavepointRequest.id, savepointDef)
.triggerJobSavepoint(savepointRequest.id, savepointDef)
.flatMap {
result =>
if (result.isFailed) ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
Expand All @@ -165,7 +165,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
case Failure(err) =>
logError(
richMsg(
s"Trigger flink job savepoint failed in ${triggerSavepointRequest.executionMode.getName}_V2 mode!"),
s"Trigger flink job savepoint failed in ${savepointRequest.executionMode.getName}_V2 mode!"),
err)
throw err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,16 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {

@throws[Exception]
override def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
executeClientAction(
request,
savepointRequest,
flinkConfig,
(jobId, clusterClient) => {
val actionResult = super.triggerSavepoint(request, jobId, clusterClient)
val actionResult = super.triggerSavepoint(savepointRequest, jobId, clusterClient)
SavepointResponse(actionResult)
})
}
)
}

// noinspection DuplicatedCode
Expand Down