diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala index 752bdf0459..609d5cf661 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala @@ -42,59 +42,65 @@ object FlinkClientEndpoint { } ) - def submit(request: SubmitRequest): SubmitResponse = { - clients.get(request.executionMode) match { - case Some(client) => client.submit(request) + def submit(submitRequest: SubmitRequest): SubmitResponse = { + clients.get(submitRequest.executionMode) match { + case Some(client) => client.submit(submitRequest) case _ => - throw new UnsupportedOperationException(s"Unsupported ${request.executionMode} submit ") + throw new UnsupportedOperationException( + s"Unsupported ${submitRequest.executionMode} submit ") } } - def cancel(request: CancelRequest): CancelResponse = { - clients.get(request.executionMode) match { - case Some(client) => client.cancel(request) + def cancel(cancelRequest: CancelRequest): CancelResponse = { + clients.get(cancelRequest.executionMode) match { + case Some(client) => client.cancel(cancelRequest) case _ => - throw new UnsupportedOperationException(s"Unsupported ${request.executionMode} cancel ") + throw new UnsupportedOperationException( + s"Unsupported ${cancelRequest.executionMode} cancel ") } } - def triggerSavepoint(request: TriggerSavepointRequest): SavepointResponse = { - clients.get(request.executionMode) match { - case Some(client) => client.triggerSavepoint(request) + def triggerSavepoint(savepointRequest: TriggerSavepointRequest): SavepointResponse = { + clients.get(savepointRequest.executionMode) match { + case Some(client) => client.triggerSavepoint(savepointRequest) case _ => throw new UnsupportedOperationException( - s"Unsupported ${request.executionMode} triggerSavepoint ") + s"Unsupported ${savepointRequest.executionMode} triggerSavepoint ") } } - def deploy(request: DeployRequest): DeployResponse = { - request.executionMode match { - case YARN_SESSION => YarnSessionClient.deploy(request) - case KUBERNETES_NATIVE_SESSION => KubernetesNativeSessionClient.deploy(request) + def deploy(deployRequest: DeployRequest): DeployResponse = { + deployRequest.executionMode match { + case YARN_SESSION => YarnSessionClient.deploy(deployRequest) + case KUBERNETES_NATIVE_SESSION => + K8sFlinkConfig.isV2Enabled match { + case true => KubernetesSessionClientV2.deploy(deployRequest) + case _ => KubernetesNativeSessionClient.deploy(deployRequest) + } case _ => throw new UnsupportedOperationException( - s"Unsupported ${request.executionMode} deploy cluster ") + s"Unsupported ${deployRequest.executionMode} deploy cluster ") } } - def shutdown(request: ShutDownRequest): ShutDownResponse = { - request.executionMode match { - case YARN_SESSION => YarnSessionClient.shutdown(request) + def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = { + shutDownRequest.executionMode match { + case YARN_SESSION => YarnSessionClient.shutdown(shutDownRequest) case KUBERNETES_NATIVE_SESSION => K8sFlinkConfig.isV2Enabled match { - case true => KubernetesSessionClientV2.shutdown(request) - case _ => KubernetesNativeSessionClient.shutdown(request) + case true => KubernetesSessionClientV2.shutdown(shutDownRequest) + case _ => KubernetesNativeSessionClient.shutdown(shutDownRequest) } case KUBERNETES_NATIVE_APPLICATION => K8sFlinkConfig.isV2Enabled match { - case true => KubernetesApplicationClientV2.shutdown(request) + case true => KubernetesApplicationClientV2.shutdown(shutDownRequest) case _ => throw new UnsupportedOperationException( - s"Unsupported ${request.executionMode} shutdown application ") + s"Unsupported ${shutDownRequest.executionMode} shutdown application ") } case _ => throw new UnsupportedOperationException( - s"Unsupported ${request.executionMode} shutdown cluster ") + s"Unsupported ${shutDownRequest.executionMode} shutdown cluster ") } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala index c40fba8999..de4dbe6cf3 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.SavepointConfigOptions import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion import zio.ZIO +import scala.collection.convert.ImplicitConversions.`map AsScala` import scala.collection.mutable import scala.jdk.CollectionConverters.mapAsScalaMapConverter import scala.util.{Failure, Success, Try} @@ -125,7 +126,20 @@ object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger { @throws[Throwable] def deploy(deployRequest: DeployRequest): DeployResponse = { - + logInfo( + s""" + |--------------------------------------- kubernetes session start --------------------------------------- + | userFlinkHome : ${deployRequest.flinkVersion.flinkHome} + | flinkVersion : ${deployRequest.flinkVersion.version} + | execMode : ${deployRequest.executionMode.name()} + | clusterId : ${deployRequest.clusterId} + | namespace : ${deployRequest.k8sDeployParam.kubernetesNamespace} + | exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType} + | serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount} + | flinkImage : ${deployRequest.k8sDeployParam.flinkImage} + | properties : ${deployRequest.properties.mkString(" ")} + |------------------------------------------------------------------------------------------- + |""".stripMargin) val richMsg: String => String = s"[flink-submit][appId=${deployRequest.id}] " + _ val flinkConfig = diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 092488c474..7fb6aa1483 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -209,7 +209,7 @@ trait FlinkClientTrait extends Logger { @throws[Exception] def doTriggerSavepoint( - request: TriggerSavepointRequest, + savepointRequest: TriggerSavepointRequest, flinkConf: Configuration): SavepointResponse @throws[Exception] diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala index 17949cb37d..e853acdd19 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala @@ -139,20 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait { @throws[Exception] override def doTriggerSavepoint( - triggerSavepointRequest: TriggerSavepointRequest, + savepointRequest: TriggerSavepointRequest, flinkConf: Configuration): SavepointResponse = { val savepointDef = JobSavepointDef( - savepointPath = Option(triggerSavepointRequest.savepointPath), - formatType = Option(triggerSavepointRequest.nativeFormat) + savepointPath = Option(savepointRequest.savepointPath), + formatType = Option(savepointRequest.nativeFormat) .map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT) ) def richMsg: String => String = - s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _ + s"[flink-trigger-savepoint][appId=${savepointRequest.id}] " + _ FlinkK8sOperator - .triggerJobSavepoint(triggerSavepointRequest.id, savepointDef) + .triggerJobSavepoint(savepointRequest.id, savepointDef) .flatMap { result => if (result.isFailed) ZIO.fail(TriggerJobSavepointFail(result.failureCause.get)) @@ -165,7 +165,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait { case Failure(err) => logError( richMsg( - s"Trigger flink job savepoint failed in ${triggerSavepointRequest.executionMode.getName}_V2 mode!"), + s"Trigger flink job savepoint failed in ${savepointRequest.executionMode.getName}_V2 mode!"), err) throw err } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala index 712fd4bc84..49958a20cf 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala @@ -129,15 +129,16 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait { @throws[Exception] override def doTriggerSavepoint( - request: TriggerSavepointRequest, + savepointRequest: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = { executeClientAction( - request, + savepointRequest, flinkConfig, (jobId, clusterClient) => { - val actionResult = super.triggerSavepoint(request, jobId, clusterClient) + val actionResult = super.triggerSavepoint(savepointRequest, jobId, clusterClient) SavepointResponse(actionResult) - }) + } + ) } // noinspection DuplicatedCode